Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C54E2D6BF for ; Sun, 26 Aug 2012 04:01:12 +0000 (UTC) Received: (qmail 90232 invoked by uid 500); 26 Aug 2012 04:01:12 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 90168 invoked by uid 500); 26 Aug 2012 04:01:12 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 90154 invoked by uid 99); 26 Aug 2012 04:01:12 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 26 Aug 2012 04:01:12 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 26 Aug 2012 04:01:10 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 871EA2388A91; Sun, 26 Aug 2012 04:00:27 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1377372 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: CHANGES.txt src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Date: Sun, 26 Aug 2012 04:00:27 -0000 To: hdfs-commits@hadoop.apache.org From: suresh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120826040027.871EA2388A91@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: suresh Date: Sun Aug 26 04:00:26 2012 New Revision: 1377372 URL: http://svn.apache.org/viewvc?rev=1377372&view=rev Log: HDFS-3851. DFSOutputStream class code cleanup. Contributed by Jing Zhao. Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1377372&r1=1377371&r2=1377372&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sun Aug 26 04:00:26 2012 @@ -129,6 +129,8 @@ Trunk (unreleased changes) HDFS-3844. Add @Override and remove {@inheritdoc} and unnecessary imports. (Jing Zhao via suresh) + HDFS-3851. DFSOutputStream class code cleanup. (Jing Zhao via suresh) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1377372&r1=1377371&r2=1377372&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Sun Aug 26 04:00:26 2012 @@ -56,8 +56,8 @@ import org.apache.hadoop.hdfs.protocol.L import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; +import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; @@ -107,8 +107,8 @@ import com.google.common.annotations.Vis ****************************************************************/ @InterfaceAudience.Private public class DFSOutputStream extends FSOutputSummer implements Syncable { - private final DFSClient dfsClient; private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB + private final DFSClient dfsClient; private Socket s; // closed is accessed by different threads under different locks. private volatile boolean closed = false; @@ -138,15 +138,15 @@ public class DFSOutputStream extends FSO private final short blockReplication; // replication factor of file private boolean shouldSyncBlock = false; // force blocks to disk upon close - private class Packet { - long seqno; // sequencenumber of buffer in block - long offsetInBlock; // offset in block - private boolean lastPacketInBlock; // is this the last packet in block? - boolean syncBlock; // this packet forces the current block to disk - int numChunks; // number of chunks currently in packet - int maxChunks; // max chunks in packet - + private static class Packet { + private static final long HEART_BEAT_SEQNO = -1L; + long seqno; // sequencenumber of buffer in block + final long offsetInBlock; // offset in block + boolean syncBlock; // this packet forces the current block to disk + int numChunks; // number of chunks currently in packet + final int maxChunks; // max chunks in packet byte[] buf; + private boolean lastPacketInBlock; // is this the last packet in block? /** * buf is pointed into like follows: @@ -164,45 +164,36 @@ public class DFSOutputStream extends FSO */ int checksumStart; int checksumPos; - int dataStart; + final int dataStart; int dataPos; - private static final long HEART_BEAT_SEQNO = -1L; - /** * Create a heartbeat packet. */ - Packet() { - this.lastPacketInBlock = false; - this.numChunks = 0; - this.offsetInBlock = 0; - this.seqno = HEART_BEAT_SEQNO; - - buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN]; - - checksumStart = checksumPos = dataPos = dataStart = PacketHeader.PKT_MAX_HEADER_LEN; - maxChunks = 0; + Packet(int checksumSize) { + this(0, 0, 0, HEART_BEAT_SEQNO, checksumSize); } /** * Create a new packet. * - * @param pktSize maximum size of the packet, including checksum data and actual data. + * @param pktSize maximum size of the packet, + * including checksum data and actual data. * @param chunksPerPkt maximum number of chunks per packet. * @param offsetInBlock offset in bytes into the HDFS block. */ - Packet(int pktSize, int chunksPerPkt, long offsetInBlock) { + Packet(int pktSize, int chunksPerPkt, long offsetInBlock, + long seqno, int checksumSize) { this.lastPacketInBlock = false; this.numChunks = 0; this.offsetInBlock = offsetInBlock; - this.seqno = currentSeqno; - currentSeqno++; + this.seqno = seqno; buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN + pktSize]; checksumStart = PacketHeader.PKT_MAX_HEADER_LEN; checksumPos = checksumStart; - dataStart = checksumStart + (chunksPerPkt * checksum.getChecksumSize()); + dataStart = checksumStart + (chunksPerPkt * checksumSize); dataPos = dataStart; maxChunks = chunksPerPkt; } @@ -412,6 +403,7 @@ public class DFSOutputStream extends FSO response.join(); response = null; } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); } } @@ -439,6 +431,7 @@ public class DFSOutputStream extends FSO try { dataQueue.wait(timeout); } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); } doSleep = false; now = Time.now(); @@ -448,7 +441,7 @@ public class DFSOutputStream extends FSO } // get packet to be sent. if (dataQueue.isEmpty()) { - one = new Packet(); // heartbeat packet + one = new Packet(checksum.getChecksumSize()); // heartbeat packet } else { one = dataQueue.getFirst(); // regular data packet } @@ -488,6 +481,7 @@ public class DFSOutputStream extends FSO // wait for acks to arrive from datanodes dataQueue.wait(1000); } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); } } } @@ -518,7 +512,7 @@ public class DFSOutputStream extends FSO blockStream.flush(); } catch (IOException e) { // HDFS-3398 treat primary DN is down since client is unable to - // write to primary DN + // write to primary DN errorIndex = 0; throw e; } @@ -607,6 +601,7 @@ public class DFSOutputStream extends FSO response.close(); response.join(); } catch (InterruptedException e) { + DFSClient.LOG.warn("Caught exception ", e); } finally { response = null; } @@ -1178,6 +1173,7 @@ public class DFSOutputStream extends FSO Thread.sleep(sleeptime); sleeptime *= 2; } catch (InterruptedException ie) { + DFSClient.LOG.warn("Caught exception ", ie); } } } else { @@ -1421,7 +1417,7 @@ public class DFSOutputStream extends FSO if (currentPacket == null) { currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock); + bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + currentPacket.seqno + @@ -1468,7 +1464,8 @@ public class DFSOutputStream extends FSO // indicate the end of block and reset bytesCurBlock. // if (bytesCurBlock == blockSize) { - currentPacket = new Packet(0, 0, bytesCurBlock); + currentPacket = new Packet(0, 0, bytesCurBlock, + currentSeqno++, this.checksum.getChecksumSize()); currentPacket.lastPacketInBlock = true; currentPacket.syncBlock = shouldSyncBlock; waitAndQueueCurrentPacket(); @@ -1540,7 +1537,7 @@ public class DFSOutputStream extends FSO // but sync was requested. // Send an empty packet currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock); + bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); } } else { // We already flushed up to this offset. @@ -1557,7 +1554,7 @@ public class DFSOutputStream extends FSO // and sync was requested. // So send an empty sync packet. currentPacket = new Packet(packetSize, chunksPerPacket, - bytesCurBlock); + bytesCurBlock, currentSeqno++, this.checksum.getChecksumSize()); } else { // just discard the current packet since it is already been sent. currentPacket = null; @@ -1738,7 +1735,8 @@ public class DFSOutputStream extends FSO if (bytesCurBlock != 0) { // send an empty packet to mark the end of the block - currentPacket = new Packet(0, 0, bytesCurBlock); + currentPacket = new Packet(0, 0, bytesCurBlock, + currentSeqno++, this.checksum.getChecksumSize()); currentPacket.lastPacketInBlock = true; currentPacket.syncBlock = shouldSyncBlock; } @@ -1778,6 +1776,7 @@ public class DFSOutputStream extends FSO DFSClient.LOG.info("Could not complete file " + src + " retrying..."); } } catch (InterruptedException ie) { + DFSClient.LOG.warn("Caught exception ", ie); } } }