Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A167717DEF for ; Wed, 1 Apr 2015 23:54:54 +0000 (UTC) Received: (qmail 92698 invoked by uid 500); 1 Apr 2015 23:54:54 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 92626 invoked by uid 500); 1 Apr 2015 23:54:54 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 92617 invoked by uid 99); 1 Apr 2015 23:54:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 01 Apr 2015 23:54:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 24CCDE10A1; Wed, 1 Apr 2015 23:54:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wheat9@apache.org To: common-commits@hadoop.apache.org Message-Id: <8240d65659fd4f92b05c6f802cec6819@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-8008. Support client-side back off when the datanodes are congested. Contributed by Haohui Mai. Date: Wed, 1 Apr 2015 23:54:54 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2 9e114ee60 -> cfcf79549 HDFS-8008. Support client-side back off when the datanodes are congested. Contributed by Haohui Mai. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cfcf7954 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cfcf7954 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cfcf7954 Branch: refs/heads/branch-2 Commit: cfcf795492f960faa7891044cc79ea9d2051387b Parents: 9e114ee Author: Haohui Mai Authored: Wed Apr 1 16:54:46 2015 -0700 Committer: Haohui Mai Committed: Wed Apr 1 16:54:53 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DataStreamer.java | 63 ++++++++++++++++++++ .../hdfs/protocol/datatransfer/PipelineAck.java | 4 ++ .../apache/hadoop/hdfs/TestDFSOutputStream.java | 42 +++++++++++++ 4 files changed, 112 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfcf7954/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8f9fcd9..3bace16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -559,6 +559,9 @@ Release 2.7.0 - UNRELEASED HDFS-7742. Favoring decommissioning node for replication can cause a block to stay underreplicated for long periods (Nathan Roberts via kihwal) + HDFS-8008. Support client-side back off when the datanodes are congested. + (wheat9) + OPTIMIZATIONS HDFS-7454. Reduce memory footprint for AclEntries in NameNode. http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfcf7954/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 9c437ba..6ff4c24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -218,6 +218,13 @@ class DataStreamer extends Daemon { private boolean failPacket = false; private final long dfsclientSlowLogThresholdMs; private long artificialSlowdown = 0; + // List of congested data nodes. The stream will back off if the DataNodes + // are congested + private final ArrayList congestedNodes = new ArrayList<>(); + private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000; + private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS = + CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10; + private int lastCongestionBackoffTime; private final LoadingCache excludedNodes; @@ -386,6 +393,11 @@ class DataStreamer extends Daemon { one = createHeartbeatPacket(); assert one != null; } else { + try { + backOffIfNecessary(); + } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); + } one = dataQueue.getFirst(); // regular data packet long parents[] = one.getTraceParents(); if (parents.length > 0) { @@ -815,9 +827,14 @@ class DataStreamer extends Daemon { long seqno = ack.getSeqno(); // processes response status from datanodes. + ArrayList congestedNodesFromAck = new ArrayList<>(); for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) { final Status reply = PipelineAck.getStatusFromHeader(ack .getHeaderFlag(i)); + if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) == + PipelineAck.ECN.CONGESTED) { + congestedNodesFromAck.add(targets[i]); + } // Restart will not be treated differently unless it is // the local node or the only one in the pipeline. if (PipelineAck.isRestartOOBStatus(reply) && @@ -839,6 +856,18 @@ class DataStreamer extends Daemon { } } + if (!congestedNodesFromAck.isEmpty()) { + synchronized (congestedNodes) { + congestedNodes.clear(); + congestedNodes.addAll(congestedNodesFromAck); + } + } else { + synchronized (congestedNodes) { + congestedNodes.clear(); + lastCongestionBackoffTime = 0; + } + } + assert seqno != PipelineAck.UNKOWN_SEQNO : "Ack for unknown seqno should be a failed ack: " + ack; if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack @@ -1544,6 +1573,40 @@ class DataStreamer extends Daemon { } /** + * This function sleeps for a certain amount of time when the writing + * pipeline is congested. The function calculates the time based on a + * decorrelated filter. + * + * @see + * + * http://www.awsarchitectureblog.com/2015/03/backoff.html + */ + private void backOffIfNecessary() throws InterruptedException { + int t = 0; + synchronized (congestedNodes) { + if (!congestedNodes.isEmpty()) { + StringBuilder sb = new StringBuilder("DataNode"); + for (DatanodeInfo i : congestedNodes) { + sb.append(' ').append(i); + } + int range = Math.abs(lastCongestionBackoffTime * 3 - + CONGESTION_BACKOFF_MEAN_TIME_IN_MS); + int base = Math.min(lastCongestionBackoffTime * 3, + CONGESTION_BACKOFF_MEAN_TIME_IN_MS); + t = Math.min(CONGESTION_BACK_OFF_MAX_TIME_IN_MS, + (int)(base + Math.random() * range)); + lastCongestionBackoffTime = t; + sb.append(" are congested. Backing off for ").append(t).append(" ms"); + DFSClient.LOG.info(sb.toString()); + congestedNodes.clear(); + } + } + if (t != 0) { + Thread.sleep(t); + } + } + + /** * get the block this streamer is writing to * * @return the block this streamer is writing to http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfcf7954/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java index 9bd4115..a811f39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java @@ -257,6 +257,10 @@ public class PipelineAck { return StatusFormat.getStatus(header); } + public static ECN getECNFromHeader(int header) { + return StatusFormat.getECN(header); + } + public static int setStatusForHeader(int old, Status status) { return StatusFormat.setStatus(old, status); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cfcf7954/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java index b47e7f1..a410e74 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java @@ -17,20 +17,31 @@ */ package org.apache.hadoop.hdfs; +import java.io.DataOutputStream; import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.LinkedList; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.internal.util.reflection.Whitebox; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; + public class TestDFSOutputStream { static MiniDFSCluster cluster; @@ -100,6 +111,37 @@ public class TestDFSOutputStream { Assert.assertTrue((Integer) field.get(dos) + 257 < packetSize); } + @Test + public void testCongestionBackoff() throws IOException { + DFSClient.Conf dfsClientConf = mock(DFSClient.Conf.class); + DFSClient client = mock(DFSClient.class); + when(client.getConf()).thenReturn(dfsClientConf); + client.clientRunning = true; + DataStreamer stream = new DataStreamer( + mock(HdfsFileStatus.class), + mock(ExtendedBlock.class), + client, + "foo", null, null, null, null); + + DataOutputStream blockStream = mock(DataOutputStream.class); + doThrow(new IOException()).when(blockStream).flush(); + Whitebox.setInternalState(stream, "blockStream", blockStream); + Whitebox.setInternalState(stream, "stage", + BlockConstructionStage.PIPELINE_CLOSE); + @SuppressWarnings("unchecked") + LinkedList dataQueue = (LinkedList) + Whitebox.getInternalState(stream, "dataQueue"); + @SuppressWarnings("unchecked") + ArrayList congestedNodes = (ArrayList) + Whitebox.getInternalState(stream, "congestedNodes"); + congestedNodes.add(mock(DatanodeInfo.class)); + DFSPacket packet = mock(DFSPacket.class); + when(packet.getTraceParents()).thenReturn(new long[] {}); + dataQueue.add(packet); + stream.run(); + Assert.assertTrue(congestedNodes.isEmpty()); + } + @AfterClass public static void tearDown() { cluster.shutdown();