Return-Path: Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: (qmail 44978 invoked from network); 4 Feb 2011 21:14:09 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Feb 2011 21:14:09 -0000 Received: (qmail 77055 invoked by uid 500); 4 Feb 2011 21:14:09 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 76977 invoked by uid 500); 4 Feb 2011 21:14:09 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 76969 invoked by uid 99); 4 Feb 2011 21:14:08 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Feb 2011 21:14:08 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Feb 2011 21:14:07 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 615ED23888CF; Fri, 4 Feb 2011 21:13:47 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1067287 - in /hadoop/hdfs/branches/branch-0.22: CHANGES.txt src/java/org/apache/hadoop/hdfs/DFSOutputStream.java src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java Date: Fri, 04 Feb 2011 21:13:46 -0000 To: hdfs-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110204211347.615ED23888CF@eris.apache.org> Author: todd Date: Fri Feb 4 21:13:41 2011 New Revision: 1067287 URL: http://svn.apache.org/viewvc?rev=1067287&view=rev Log: HDFS-1529. Incorrect handling of interrupts in waitForAckedSeqno can cause deadlock. Contributed by Todd Lipcon. Modified: hadoop/hdfs/branches/branch-0.22/CHANGES.txt hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java Modified: hadoop/hdfs/branches/branch-0.22/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/CHANGES.txt?rev=1067287&r1=1067286&r2=1067287&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.22/CHANGES.txt (original) +++ hadoop/hdfs/branches/branch-0.22/CHANGES.txt Fri Feb 4 21:13:41 2011 @@ -439,6 +439,9 @@ Release 0.22.0 - Unreleased HDFS-900. Corrupt replicas are not processed correctly in block report (shv) + HDFS-1529. Incorrect handling of interrupts in waitForAckedSeqno can cause + deadlock (todd) + Release 0.21.1 - Unreleased IMPROVEMENTS Modified: hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1067287&r1=1067286&r2=1067287&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original) +++ hadoop/hdfs/branches/branch-0.22/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Fri Feb 4 21:13:41 2011 @@ -24,6 +24,7 @@ import java.io.BufferedOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.FileNotFoundException; +import java.io.InterruptedIOException; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; @@ -1167,7 +1168,16 @@ class DFSOutputStream extends FSOutputSu while (!closed && dataQueue.size() + ackQueue.size() > MAX_PACKETS) { try { dataQueue.wait(); - } catch (InterruptedException e) { + } catch (InterruptedException e) { + // If we get interrupted while waiting to queue data, we still need to get rid + // of the current packet. This is because we have an invariant that if + // currentPacket gets full, it will get queued before the next writeChunk. + // + // Rather than wait around for space in the queue, we should instead try to + // return to the caller as soon as possible, even though we slightly overrun + // the MAX_PACKETS iength. + Thread.currentThread().interrupt(); + break; } } isClosed(); @@ -1339,6 +1349,11 @@ class DFSOutputStream extends FSOutputSu throw ioe; } } + } catch (InterruptedIOException interrupt) { + // This kind of error doesn't mean that the stream itself is broken - just the + // flushing thread got interrupted. So, we shouldn't close down the writer, + // but instead just propagate the error + throw interrupt; } catch (IOException e) { DFSClient.LOG.warn("Error while syncing", e); synchronized (this) { @@ -1416,7 +1431,8 @@ class DFSOutputStream extends FSOutputSu try { dataQueue.wait(1000); // when we receive an ack, we notify on dataQueue } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); + throw new InterruptedIOException( + "Interrupted while waiting for data to be acknowledged by pipeline"); } } } Modified: hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java?rev=1067287&r1=1067286&r2=1067287&view=diff ============================================================================== --- hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java (original) +++ hadoop/hdfs/branches/branch-0.22/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java Fri Feb 4 21:13:41 2011 @@ -26,9 +26,12 @@ import org.apache.hadoop.hdfs.server.dat import org.apache.log4j.Level; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import org.junit.Test; +import java.io.InterruptedIOException; import java.io.IOException; /** Class contains a set of tests to verify the correctness of @@ -170,38 +173,107 @@ public class TestHFlush { System.out.println("p=" + p); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build(); - DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem(); + try { + DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem(); + + byte[] fileContents = AppendTestUtil.initBuffer(fileLen); + + // create a new file. + FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM); + stm.write(fileContents, 0, 1); + Thread.sleep(timeout); + stm.hflush(); + System.out.println("Wrote 1 byte and hflush " + p); + + // write another byte + Thread.sleep(timeout); + stm.write(fileContents, 1, 1); + stm.hflush(); + + stm.write(fileContents, 2, 1); + Thread.sleep(timeout); + stm.hflush(); + + stm.write(fileContents, 3, 1); + Thread.sleep(timeout); + stm.write(fileContents, 4, 1); + stm.hflush(); + + stm.write(fileContents, 5, 1); + Thread.sleep(timeout); + stm.close(); + + // verify that entire file is good + AppendTestUtil.checkFullFile(fs, p, fileLen, + fileContents, "Failed to slowly write to a file"); + } finally { + cluster.shutdown(); + } + } + + @Test + public void testHFlushInterrupted() throws Exception { + final int DATANODE_NUM = 2; + final int fileLen = 6; byte[] fileContents = AppendTestUtil.initBuffer(fileLen); + Configuration conf = new HdfsConfiguration(); + final Path p = new Path("/hflush-interrupted"); - // create a new file. - FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM); + System.out.println("p=" + p); - stm.write(fileContents, 0, 1); - Thread.sleep(timeout); - stm.hflush(); - System.out.println("Wrote 1 byte and hflush " + p); - - // write another byte - Thread.sleep(timeout); - stm.write(fileContents, 1, 1); - stm.hflush(); - - stm.write(fileContents, 2, 1); - Thread.sleep(timeout); - stm.hflush(); - - stm.write(fileContents, 3, 1); - Thread.sleep(timeout); - stm.write(fileContents, 4, 1); - stm.hflush(); - - stm.write(fileContents, 5, 1); - Thread.sleep(timeout); - stm.close(); - - // verify that entire file is good - AppendTestUtil.checkFullFile(fs, p, fileLen, - fileContents, "Failed to slowly write to a file"); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(DATANODE_NUM).build(); + try { + DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem(); + + // create a new file. + FSDataOutputStream stm = AppendTestUtil.createFile(fs, p, DATANODE_NUM); + + stm.write(fileContents, 0, 2); + Thread.currentThread().interrupt(); + try { + stm.hflush(); + // If we made it past the hflush(), then that means that the ack made it back + // from the pipeline before we got to the wait() call. In that case we should + // still have interrupted status. + assertTrue(Thread.currentThread().interrupted()); + } catch (InterruptedIOException ie) { + System.out.println("Got expected exception during flush"); + } + assertFalse(Thread.currentThread().interrupted()); + + // Try again to flush should succeed since we no longer have interrupt status + stm.hflush(); + + // Write some more data and flush + stm.write(fileContents, 2, 2); + stm.hflush(); + + // Write some data and close while interrupted + + stm.write(fileContents, 4, 2); + Thread.currentThread().interrupt(); + try { + stm.close(); + // If we made it past the close(), then that means that the ack made it back + // from the pipeline before we got to the wait() call. In that case we should + // still have interrupted status. + assertTrue(Thread.currentThread().interrupted()); + } catch (InterruptedIOException ioe) { + System.out.println("Got expected exception during close"); + // If we got the exception, we shouldn't have interrupted status anymore. + assertFalse(Thread.currentThread().interrupted()); + + // Now do a successful close. + stm.close(); + } + + + // verify that entire file is good + AppendTestUtil.checkFullFile(fs, p, fileLen, + fileContents, "Failed to deal with thread interruptions"); + } finally { + cluster.shutdown(); + } } }