Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 10647 invoked from network); 12 Sep 2008 17:57:06 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 12 Sep 2008 17:57:06 -0000 Received: (qmail 18481 invoked by uid 500); 12 Sep 2008 17:57:03 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 18449 invoked by uid 500); 12 Sep 2008 17:57:03 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 18440 invoked by uid 99); 12 Sep 2008 17:57:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Sep 2008 10:57:03 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Sep 2008 17:56:13 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id C2FAC2388A0F; Fri, 12 Sep 2008 10:56:15 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r694755 - in /hadoop/core/trunk: CHANGES.txt src/hdfs/org/apache/hadoop/hdfs/DFSClient.java src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Date: Fri, 12 Sep 2008 17:56:15 -0000 To: core-commits@hadoop.apache.org From: rangadi@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080912175615.C2FAC2388A0F@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: rangadi Date: Fri Sep 12 10:56:15 2008 New Revision: 694755 URL: http://svn.apache.org/viewvc?rev=694755&view=rev Log: HADOOP-3831. Very slow reading clients sometimes failed while reading. (rangadi) Added: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=694755&r1=694754&r2=694755&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Fri Sep 12 10:56:15 2008 @@ -537,6 +537,9 @@ HADOOP-4112. Handles cleanupTask in JobHistory (Amareshwari Sriramadasu via ddas) + HADOOP-3831. Very slow reading clients sometimes failed while reading. + (rangadi) + Release 0.18.1 - Unreleased IMPROVEMENTS Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=694755&r1=694754&r2=694755&view=diff ============================================================================== --- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Sep 12 10:56:15 2008 @@ -80,6 +80,7 @@ private int datanodeWriteTimeout; final int writePacketSize; private FileSystem.Statistics stats; + private int maxBlockAcquireFailures; public static ClientProtocol createNamenode(Configuration conf) throws IOException { @@ -162,6 +163,9 @@ this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class); // dfs.write.packet.size is an internal config variable this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024); + this.maxBlockAcquireFailures = + conf.getInt("dfs.client.max.block.acquire.failures", + MAX_BLOCK_ACQUIRE_FAILURES); try { this.ugi = UnixUserGroupInformation.login(conf, true); @@ -1473,6 +1477,14 @@ private synchronized int readBuffer(byte buf[], int off, int len) throws IOException { IOException ioe; + + /* we retry current node only once. So this is set to true only here. + * Intention is to handle one common case of an error that is not a + * failure on datanode or client : when DataNode closes the connection + * since client is idle. If there are other cases of "non-errors" then + * then a datanode might be retried by setting this to true again. + */ + boolean retryCurrentNode = true; while (true) { // retry as many times as seekToNewSource allows. @@ -1483,16 +1495,30 @@ currentNode.getName() + " at " + ce.getPos()); reportChecksumFailure(src, currentBlock, currentNode); ioe = ce; + retryCurrentNode = false; } catch ( IOException e ) { - LOG.warn("Exception while reading from " + currentBlock + - " of " + src + " from " + currentNode + ": " + - StringUtils.stringifyException(e)); + if (!retryCurrentNode) { + LOG.warn("Exception while reading from " + currentBlock + + " of " + src + " from " + currentNode + ": " + + StringUtils.stringifyException(e)); + } ioe = e; } - addToDeadNodes(currentNode); - if (!seekToNewSource(pos)) { - throw ioe; + boolean sourceFound = false; + if (retryCurrentNode) { + /* possibly retry the same node so that transient errors don't + * result in application level failures (e.g. Datanode could have + * closed the connection because the client is idle for too long). + */ + sourceFound = seekToBlockSource(pos); + } else { + addToDeadNodes(currentNode); + sourceFound = seekToNewSource(pos); + } + if (!sourceFound) { + throw ioe; } + retryCurrentNode = false; } } @@ -1554,7 +1580,7 @@ return new DNAddrPair(chosenNode, targetAddr); } catch (IOException ie) { String blockInfo = block.getBlock() + " file=" + src; - if (failures >= MAX_BLOCK_ACQUIRE_FAILURES) { + if (failures >= maxBlockAcquireFailures) { throw new IOException("Could not obtain block: " + blockInfo); } @@ -1727,6 +1753,16 @@ } /** + * Same as {@link #seekToNewSource(long)} except that it does not exclude + * the current datanode and might connect to the same node. + */ + private synchronized boolean seekToBlockSource(long targetPos) + throws IOException { + currentNode = blockSeekTo(targetPos); + return true; + } + + /** * Seek to given position on a node other than the current node. If * a node other than the current node is found, then returns true. * If another node could not be found, then returns false. Added: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=694755&view=auto ============================================================================== --- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java (added) +++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Sep 12 10:56:15 2008 @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; + +import junit.framework.TestCase; + + +/** + * These tests make sure that DFSClient retries fetching data from DFS + * properly in case of errors. + */ +public class TestDFSClientRetries extends TestCase { + + // writes 'len' bytes of data to out. + private static void writeData(OutputStream out, int len) throws IOException { + byte [] buf = new byte[4096*16]; + while(len > 0) { + int toWrite = Math.min(len, buf.length); + out.write(buf, 0, toWrite); + len -= toWrite; + } + } + + /** + * This makes sure that when DN closes clients socket after client had + * successfully connected earlier, the data can still be fetched. + */ + public void testWriteTimeoutAtDataNode() throws IOException, + InterruptedException { + Configuration conf = new Configuration(); + + final int writeTimeout = 100; //milliseconds. + // set a very short write timeout for datanode, so that tests runs fast. + conf.setInt("dfs.datanode.socket.write.timeout", writeTimeout); + // set a smaller block size + final int blockSize = 10*1024*1024; + conf.setInt("dfs.block.size", blockSize); + conf.setInt("dfs.client.max.block.acquire.failures", 1); + // set a small buffer size + final int bufferSize = 4096; + conf.setInt("io.file.buffer.size", bufferSize); + + MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null); + + try { + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + + Path filePath = new Path("/testWriteTimeoutAtDataNode"); + OutputStream out = fs.create(filePath, true, bufferSize); + + // write a 2 block file. + writeData(out, 2*blockSize); + out.close(); + + byte[] buf = new byte[1024*1024]; // enough to empty TCP buffers. + + InputStream in = fs.open(filePath, bufferSize); + + //first read a few bytes + IOUtils.readFully(in, buf, 0, bufferSize/2); + //now read few more chunks of data by sleeping in between : + for(int i=0; i<10; i++) { + Thread.sleep(2*writeTimeout); // force write timeout at the datanode. + // read enough to empty out socket buffers. + IOUtils.readFully(in, buf, 0, buf.length); + } + // successfully read with write timeout on datanodes. + in.close(); + } finally { + cluster.shutdown(); + } + } + + // more tests related to different failure cases can be added here. +}