Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5541F200CF7 for ; Tue, 19 Sep 2017 20:09:26 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 53BB51609E0; Tue, 19 Sep 2017 18:09:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 237501609BF for ; Tue, 19 Sep 2017 20:09:24 +0200 (CEST) Received: (qmail 53263 invoked by uid 500); 19 Sep 2017 18:09:24 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 53250 invoked by uid 99); 19 Sep 2017 18:09:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Sep 2017 18:09:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0A6ADE7DF9; Tue, 19 Sep 2017 18:09:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: brahma@apache.org To: common-commits@hadoop.apache.org Date: Tue, 19 Sep 2017 18:09:24 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] hadoop git commit: HDFS-11799. Introduce a config to allow setting up write pipeline with fewer nodes than replication factor. Contributed by Brahma Reddy Battula archived-at: Tue, 19 Sep 2017 18:09:26 -0000 Repository: hadoop Updated Branches: refs/heads/branch-2 d2386725f -> eddc58186 refs/heads/branch-2.8 67ada5d48 -> a81167e2e HDFS-11799. Introduce a config to allow setting up write pipeline with fewer nodes than replication factor. Contributed by Brahma Reddy Battula (cherry picked from commit fda1221c55101d97ac62e1ee4e3ddf9a915d5363) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/eddc5818 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/eddc5818 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/eddc5818 Branch: refs/heads/branch-2 Commit: eddc581860b9dfcec4814756e4c14b52abed2f7d Parents: d238672 Author: Brahma Reddy Battula Authored: Tue Sep 19 11:25:45 2017 +0530 Committer: Brahma Reddy Battula Committed: Tue Sep 19 11:31:22 2017 +0530 ---------------------------------------------------------------------- .../java/org/apache/hadoop/hdfs/DFSClient.java | 13 +- .../org/apache/hadoop/hdfs/DataStreamer.java | 31 +- .../hdfs/client/HdfsClientConfigKeys.java | 2 + .../src/main/resources/hdfs-default.xml | 17 ++ .../TestReplaceDatanodeFailureReplication.java | 291 +++++++++++++++++++ .../hadoop/tools/TestHdfsConfigFields.java | 5 +- 6 files changed, 355 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/eddc5818/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 591d604..75128ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -230,6 +230,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, final String clientName; final SocketFactory socketFactory; final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure; + final short dtpReplaceDatanodeOnFailureReplication; private final FileSystem.Statistics stats; private final URI namenodeUri; private final Random r = new Random(); @@ -311,7 +312,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf); this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf); - + this.dtpReplaceDatanodeOnFailureReplication = (short) conf + .getInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION_DEFAULT); + if (LOG.isDebugEnabled()) { + LOG.debug( + "Sets " + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION + " to " + + dtpReplaceDatanodeOnFailureReplication); + } this.ugi = UserGroupInformation.getCurrentUser(); this.namenodeUri = nameNodeUri; http://git-wip-us.apache.org/repos/asf/hadoop/blob/eddc5818/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 0438bb2..02fb44a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -1355,7 +1355,36 @@ class DataStreamer extends Daemon { setPipeline(lb); //find the new datanode - final int d = findNewDatanode(original); + final int d; + try { + d = findNewDatanode(original); + } catch (IOException ioe) { + // check the minimal number of nodes available to decide whether to + // continue the write. + + //if live block location datanodes is greater than or equal to + // HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + // MIN_REPLICATION threshold value, continue writing to the + // remaining nodes. Otherwise throw exception. + // + // If HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + // MIN_REPLICATION is set to 0 or less than zero, an exception will be + // thrown if a replacement could not be found. + + if (dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && nodes.length + >= dfsClient.dtpReplaceDatanodeOnFailureReplication) { + DFSClient.LOG.warn( + "Failed to find a new datanode to add to the write pipeline, " + + " continue to write to the pipeline with " + nodes.length + + " nodes since it's no less than minimum replication: " + + dfsClient.dtpReplaceDatanodeOnFailureReplication + + " configured by " + + BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION + + ".", ioe); + return; + } + throw ioe; + } //transfer replica. pick a source from the original nodes final DatanodeInfo src = original[tried % original.length]; final DatanodeInfo[] targets = {nodes[d]}; http://git-wip-us.apache.org/repos/asf/hadoop/blob/eddc5818/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index a2dd9e3..872a838 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -334,6 +334,8 @@ public interface HdfsClientConfigKeys { String POLICY_DEFAULT = "DEFAULT"; String BEST_EFFORT_KEY = PREFIX + "best-effort"; boolean BEST_EFFORT_DEFAULT = false; + String MIN_REPLICATION = PREFIX + "min-replication"; + short MIN_REPLICATION_DEFAULT = 0; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/eddc5818/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 1862d50..d981af6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -637,6 +637,23 @@ + + dfs.client.block.write.replace-datanode-on-failure.min-replication + 0 + + The minimum number of replications that are needed to not to fail + the write pipeline if new datanodes can not be found to replace + failed datanodes (could be due to network failure) in the write pipeline. + If the number of the remaining datanodes in the write pipeline is greater + than or equal to this property value, continue writing to the remaining nodes. + Otherwise throw exception. + + If this is set to 0, an exception will be thrown, when a replacement + can not be found. + See also dfs.client.block.write.replace-datanode-on-failure.policy + + + dfs.blockreport.intervalMsec 21600000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/eddc5818/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java new file mode 100644 index 0000000..9591cb4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; +import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure.Policy; +import org.apache.hadoop.io.IOUtils; +import org.junit.Assert; +import org.junit.Test; + +/** + * Verify the behaviours of HdfsClientConfigKeys.BlockWrite. + * ReplaceDatanodeOnFailure.MIN_REPLICATION.if live block location datanodes is + * greater than or equal to + * 'dfs.client.block.write.replace-datanode-on-failure.min.replication' + * threshold value, if yes continue writing to the two remaining nodes. + * Otherwise it will throw exception. + *

+ * If this HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + * MIN_REPLICATION is set to 0 or less than zero, an exception will be thrown + * if a replacement could not be found. + */ +public class TestReplaceDatanodeFailureReplication { + static final Log LOG = LogFactory + .getLog(TestReplaceDatanodeFailureReplication.class); + + static final String DIR = + "/" + TestReplaceDatanodeFailureReplication.class.getSimpleName() + "/"; + static final short REPLICATION = 3; + final private static String RACK0 = "/rack0"; + + /** + * Test fail last datanode in the pipeline. + */ + @Test + public void testLastDatanodeFailureInPipeline() throws Exception { + testWriteFileAndVerifyAfterDNStop(2, 1, 10, false); + } + + /** + * Test fail first datanode in the pipeline. + */ + @Test + public void testFirstDatanodeFailureInPipeline() throws Exception { + testWriteFileAndVerifyAfterDNStop(2, 0, 10, false); + } + + /** + * Test fail all the datanodes except first in the pipeline. + */ + @Test + public void testWithOnlyFirstDatanodeIsAlive() throws Exception { + testWriteFileAndVerifyAfterDNStop(1, 1, 1, true); + } + + /** + * Test fail all the datanodes except lastnode in the pipeline. + */ + @Test + public void testWithOnlyLastDatanodeIsAlive() throws Exception { + testWriteFileAndVerifyAfterDNStop(1, 0, 1, true); + } + + /** + * Test when number of live nodes are less than the + * "dfs.client.block.write.replace-datanode-on-failure.min.replication". + */ + @Test + public void testLessNumberOfLiveDatanodesThanWriteReplaceDatanodeOnFailureRF() + throws Exception { + final MiniDFSCluster cluster = setupCluster(2); + + try { + final DistributedFileSystem fs = cluster.getFileSystem(); + final Path dir = new Path(DIR); + + final SlowWriter[] slowwriters = new SlowWriter[1]; + for (int i = 1; i <= slowwriters.length; i++) { + // create slow writers in different speed + slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i), + i * 200L); + } + + for (SlowWriter s : slowwriters) { + s.start(); + } + + // Let slow writers write something. + // Some of them are too slow and will be not yet started. + sleepSeconds(1); + + // stop an old datanode + cluster.stopDataNode(0); + cluster.stopDataNode(0); + + // Let the slow writer writes a few more seconds + // Everyone should have written something. + sleepSeconds(20); + + // check replication and interrupt. + for (SlowWriter s : slowwriters) { + try { + s.out.getCurrentBlockReplication(); + Assert.fail( + "Must throw exception as failed to add a new datanode for write " + + "pipeline, minimum failure replication"); + } catch (IOException e) { + // expected + } + s.interruptRunning(); + } + + // close files + for (SlowWriter s : slowwriters) { + s.joinAndClose(); + } + + // Verify the file + verifyFileContent(fs, slowwriters); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private MiniDFSCluster setupCluster(int failRF) throws IOException { + final Configuration conf = new HdfsConfiguration(); + conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure. + MIN_REPLICATION, failRF); + // always replace a datanode + ReplaceDatanodeOnFailure.write(Policy.ALWAYS, false, conf); + + final String[] racks = new String[REPLICATION]; + Arrays.fill(racks, RACK0); + return new MiniDFSCluster.Builder(conf).racks(racks) + .numDataNodes(REPLICATION).build(); + } + + private void testWriteFileAndVerifyAfterDNStop(int failRF, int dnindex, + int slowWrites, boolean failPipeLine) + throws IOException, InterruptedException, TimeoutException { + final MiniDFSCluster cluster = setupCluster(failRF); + try { + final DistributedFileSystem fs = cluster.getFileSystem(); + final Path dir = new Path(DIR); + + final SlowWriter[] slowwriters = new SlowWriter[slowWrites]; + for (int i = 1; i <= slowwriters.length; i++) { + // create slow writers in different speed + slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i), + i * 200L); + } + + for (SlowWriter s : slowwriters) { + s.start(); + } + + // Let slow writers write something. + // Some of them are too slow and will be not yet started. + sleepSeconds(3); + + // stop an datanode + cluster.stopDataNode(dnindex); + if (failPipeLine) { + cluster.stopDataNode(dnindex); + } + + // Let the slow writer writes a few more seconds + // Everyone should have written something. + sleepSeconds(5); + cluster.waitFirstBRCompleted(0, 10000); + // check replication and interrupt. + for (SlowWriter s : slowwriters) { + Assert.assertEquals(failRF, s.out.getCurrentBlockReplication()); + s.interruptRunning(); + } + + // close files + for (SlowWriter s : slowwriters) { + s.joinAndClose(); + } + + // Verify the file + verifyFileContent(fs, slowwriters); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private void verifyFileContent(DistributedFileSystem fs, + SlowWriter[] slowwriters) throws IOException { + LOG.info("Verify the file"); + for (int i = 0; i < slowwriters.length; i++) { + LOG.info(slowwriters[i].filepath + ": length=" + fs + .getFileStatus(slowwriters[i].filepath).getLen()); + FSDataInputStream in = null; + try { + in = fs.open(slowwriters[i].filepath); + for (int j = 0, x;; j++) { + x = in.read(); + if ((x) != -1) { + Assert.assertEquals(j, x); + } else { + return; + } + } + } finally { + IOUtils.closeStream(in); + } + } + } + + static void sleepSeconds(final int waittime) throws InterruptedException { + LOG.info("Wait " + waittime + " seconds"); + Thread.sleep(waittime * 1000L); + } + + static class SlowWriter extends Thread { + private final Path filepath; + private final HdfsDataOutputStream out; + private final long sleepms; + private volatile boolean running = true; + + SlowWriter(DistributedFileSystem fs, Path filepath, final long sleepms) + throws IOException { + super(SlowWriter.class.getSimpleName() + ":" + filepath); + this.filepath = filepath; + this.out = (HdfsDataOutputStream) fs.create(filepath, REPLICATION); + this.sleepms = sleepms; + } + + @Override public void run() { + int i = 0; + try { + sleep(sleepms); + for (; running; i++) { + LOG.info(getName() + " writes " + i); + out.write(i); + out.hflush(); + sleep(sleepms); + } + } catch (InterruptedException e) { + LOG.info(getName() + " interrupted:" + e); + } catch (IOException e) { + throw new RuntimeException(getName(), e); + } finally { + LOG.info(getName() + " terminated: i=" + i); + } + } + + void interruptRunning() { + running = false; + interrupt(); + } + + void joinAndClose() throws InterruptedException { + LOG.info(getName() + " join and close"); + join(); + IOUtils.closeStream(out); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/eddc5818/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java index 347f130..53bc165 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java @@ -40,8 +40,9 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase { @Override public void initializeMemberVariables() { xmlFilename = new String("hdfs-default.xml"); - configurationClasses = new Class[] { HdfsClientConfigKeys.class, - DFSConfigKeys.class }; + configurationClasses = + new Class[] { HdfsClientConfigKeys.class, DFSConfigKeys.class, + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class }; // Set error modes errorIfMissingConfigProps = true; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org