Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E26F3D090 for ; Fri, 19 Oct 2012 18:50:34 +0000 (UTC) Received: (qmail 87205 invoked by uid 500); 19 Oct 2012 18:50:34 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 87160 invoked by uid 500); 19 Oct 2012 18:50:34 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 87151 invoked by uid 99); 19 Oct 2012 18:50:34 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Oct 2012 18:50:34 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Oct 2012 18:50:31 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id D9C8323889E7; Fri, 19 Oct 2012 18:49:47 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1400219 [1/2] - in /hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/mai... Date: Fri, 19 Oct 2012 18:49:45 -0000 To: hdfs-commits@hadoop.apache.org From: sseth@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121019184947.D9C8323889E7@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: sseth Date: Fri Oct 19 18:49:38 2012 New Revision: 1400219 URL: http://svn.apache.org/viewvc?rev=1400219&view=rev Log: merge from trunk to branch MR-3902 Added: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/ - copied from r1400218, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java - copied unchanged from r1400218, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/datatransfer/TestPacketReceiver.java Removed: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/packages/ Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/metrics/TestNameNodeMetrics.java Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Oct 19 18:49:38 2012 @@ -147,6 +147,9 @@ Trunk (Unreleased) Block Pool Used, Block Pool Used(%) and Failed Volumes. (Brahma Reddy Battula via suresh) + HDFS-4052. BlockManager#invalidateWork should print log outside the lock. + (Jing Zhao via suresh) + OPTIMIZATIONS BUG FIXES @@ -224,8 +227,6 @@ Trunk (Unreleased) HDFS-3834. Remove unused static fields NAME, DESCRIPTION and Usage from Command. (Jing Zhao via suresh) - HDFS-3678. Edit log files are never being purged from 2NN. (atm) - HADOOP-8158. Interrupting hadoop fs -put from the command line causes a LeaseExpiredException. (daryn via harsh) @@ -343,6 +344,8 @@ Release 2.0.3-alpha - Unreleased HDFS-3912. Detect and avoid stale datanodes for writes. (Jing Zhao via suresh) + HDFS-4059. Add number of stale DataNodes to metrics. (Jing Zhao via suresh) + IMPROVEMENTS HDFS-3925. Prettify PipelineAck#toString() for printing to a log @@ -388,6 +391,20 @@ Release 2.0.3-alpha - Unreleased HDFS-4036. Remove "throw UnresolvedLinkException" from FSDirectory.unprotectedAddFile(..). (Jing Zhao via szetszwo) + HDFS-2946. HA: Put a cap on the number of completed edits files retained + by the NN. (atm) + + HDFS-4029. GenerationStamp should use an AtomicLong. (eli) + + HDFS-4068. DatanodeID and DatanodeInfo member should be private. (eli) + + HDFS-4073. Two minor improvements to FSDirectory. (Jing Zhao via szetszwo) + + HDFS-4074. Remove the unused default constructor from INode. (Brandon Li + via szetszwo) + + HDFS-4053. Increase the default block size. (eli) + OPTIMIZATIONS BUG FIXES @@ -440,6 +457,26 @@ Release 2.0.3-alpha - Unreleased HDFS-4044. Duplicate ChecksumType definition in HDFS .proto files. (Binglin Chang via suresh) + HDFS-4049. Fix hflush performance regression due to nagling delays + (todd) + + HDFS-3678. Edit log files are never being purged from 2NN. (atm) + + HDFS-4058. DirectoryScanner may fail with IOOB if the directory + scanning threads return out of volume order. (eli) + + HDFS-3985. Add timeouts to TestMulitipleNNDataBlockScanner. (todd via eli) + + HDFS-4061. TestBalancer and TestUnderReplicatedBlocks need timeouts. (eli) + + HDFS-3997. OfflineImageViewer incorrectly passes value of imageVersion when + visiting IS_COMPRESSED element. (Mithun Radhakrishnan via atm) + + HDFS-4055. TestAuditLogs is flaky. (Binglin Chang via eli) + + HDFS-4072. On file deletion remove corresponding blocks pending + replications. (Jing Zhao via suresh) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Oct 19 18:49:38 2012 @@ -31,7 +31,7 @@ import org.apache.hadoop.fs.CommonConfig public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_BLOCK_SIZE_KEY = "dfs.blocksize"; - public static final long DFS_BLOCK_SIZE_DEFAULT = 64*1024*1024; + public static final long DFS_BLOCK_SIZE_DEFAULT = 128*1024*1024; public static final String DFS_REPLICATION_KEY = "dfs.replication"; public static final short DFS_REPLICATION_DEFAULT = 3; public static final String DFS_STREAM_BUFFER_SIZE_KEY = "dfs.stream-buffer-size"; @@ -162,6 +162,8 @@ public class DFSConfigKeys extends Commo public static final int DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_DEFAULT = 2; public static final String DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY = "dfs.namenode.num.extra.edits.retained"; public static final int DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT = 1000000; //1M + public static final String DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY = "dfs.namenode.max.extra.edits.segments.retained"; + public static final int DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT = 10000; // 10k public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_KEY = "dfs.namenode.min.supported.datanode.version"; public static final String DFS_NAMENODE_MIN_SUPPORTED_DATANODE_VERSION_DEFAULT = "3.0.0-SNAPSHOT"; Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Fri Oct 19 18:49:38 2012 @@ -37,12 +37,12 @@ import org.apache.hadoop.classification. public class DatanodeID implements Comparable { public static final DatanodeID[] EMPTY_ARRAY = {}; - protected String ipAddr; // IP address - protected String hostName; // hostname - protected String storageID; // unique per cluster storageID - protected int xferPort; // data streaming port - protected int infoPort; // info server port - protected int ipcPort; // IPC server port + private String ipAddr; // IP address + private String hostName; // hostname + private String storageID; // unique per cluster storageID + private int xferPort; // data streaming port + private int infoPort; // info server port + private int ipcPort; // IPC server port public DatanodeID(DatanodeID from) { this(from.getIpAddr(), Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Fri Oct 19 18:49:38 2012 @@ -37,13 +37,13 @@ import org.apache.hadoop.util.Time; @InterfaceAudience.Private @InterfaceStability.Evolving public class DatanodeInfo extends DatanodeID implements Node { - protected long capacity; - protected long dfsUsed; - protected long remaining; - protected long blockPoolUsed; - protected long lastUpdate; - protected int xceiverCount; - protected String location = NetworkTopology.DEFAULT_RACK; + private long capacity; + private long dfsUsed; + private long remaining; + private long blockPoolUsed; + private long lastUpdate; + private int xceiverCount; + private String location = NetworkTopology.DEFAULT_RACK; // Datanode administrative states public enum AdminStates { @@ -81,8 +81,7 @@ public class DatanodeInfo extends Datano this.lastUpdate = from.getLastUpdate(); this.xceiverCount = from.getXceiverCount(); this.location = from.getNetworkLocation(); - this.adminState = from.adminState; - this.hostName = from.hostName; + this.adminState = from.getAdminState(); } public DatanodeInfo(DatanodeID nodeID) { Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java Fri Oct 19 18:49:38 2012 @@ -53,14 +53,8 @@ public class PacketReceiver implements C private final boolean useDirectBuffers; /** - * Internal buffer for reading the length prefixes at the start of - * the packet. - */ - private final ByteBuffer lengthPrefixBuf = ByteBuffer.allocate( - PacketHeader.PKT_LENGTHS_LEN); - - /** - * The entirety of the most recently read packet, excepting the + * The entirety of the most recently read packet. + * The first PKT_LENGTHS_LEN bytes of this buffer are the * length prefixes. */ private ByteBuffer curPacketBuf = null; @@ -82,6 +76,7 @@ public class PacketReceiver implements C public PacketReceiver(boolean useDirectBuffers) { this.useDirectBuffers = useDirectBuffers; + reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN); } public PacketHeader getHeader() { @@ -133,11 +128,12 @@ public class PacketReceiver implements C // checksums were not requested // DATA the actual block data Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock()); - - lengthPrefixBuf.clear(); - doReadFully(ch, in, lengthPrefixBuf); - lengthPrefixBuf.flip(); - int payloadLen = lengthPrefixBuf.getInt(); + + curPacketBuf.clear(); + curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN); + doReadFully(ch, in, curPacketBuf); + curPacketBuf.flip(); + int payloadLen = curPacketBuf.getInt(); if (payloadLen < Ints.BYTES) { // The "payload length" includes its own length. Therefore it @@ -146,7 +142,7 @@ public class PacketReceiver implements C payloadLen); } int dataPlusChecksumLen = payloadLen - Ints.BYTES; - int headerLen = lengthPrefixBuf.getShort(); + int headerLen = curPacketBuf.getShort(); if (headerLen < 0) { throw new IOException("Invalid header length " + headerLen); } @@ -166,13 +162,17 @@ public class PacketReceiver implements C // Make sure we have space for the whole packet, and // read it. - reallocPacketBuf(dataPlusChecksumLen + headerLen); + reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN + + dataPlusChecksumLen + headerLen); curPacketBuf.clear(); - curPacketBuf.limit(dataPlusChecksumLen + headerLen); + curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN); + curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN + + dataPlusChecksumLen + headerLen); doReadFully(ch, in, curPacketBuf); curPacketBuf.flip(); + curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN); - // Extract the header from the front of the buffer. + // Extract the header from the front of the buffer (after the length prefixes) byte[] headerBuf = new byte[headerLen]; curPacketBuf.get(headerBuf); if (curHeader == null) { @@ -197,10 +197,6 @@ public class PacketReceiver implements C public void mirrorPacketTo(DataOutputStream mirrorOut) throws IOException { Preconditions.checkState(!useDirectBuffers, "Currently only supported for non-direct buffers"); - assert lengthPrefixBuf.capacity() == PacketHeader.PKT_LENGTHS_LEN; - mirrorOut.write(lengthPrefixBuf.array(), - lengthPrefixBuf.arrayOffset(), - lengthPrefixBuf.capacity()); mirrorOut.write(curPacketBuf.array(), curPacketBuf.arrayOffset(), curPacketBuf.remaining()); @@ -223,23 +219,36 @@ public class PacketReceiver implements C private void reslicePacket( int headerLen, int checksumsLen, int dataLen) { + // Packet structure (refer to doRead() for details): + // PLEN HLEN HEADER CHECKSUMS DATA + // 32-bit 16-bit + // |--- lenThroughHeader ----| + // |----------- lenThroughChecksums ----| + // |------------------- lenThroughData ------| + int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen; + int lenThroughChecksums = lenThroughHeader + checksumsLen; + int lenThroughData = lenThroughChecksums + dataLen; + assert dataLen >= 0 : "invalid datalen: " + dataLen; - - assert curPacketBuf.position() == headerLen; - assert checksumsLen + dataLen == curPacketBuf.remaining() : + assert curPacketBuf.position() == lenThroughHeader; + assert curPacketBuf.limit() == lenThroughData : "headerLen= " + headerLen + " clen=" + checksumsLen + " dlen=" + dataLen + " rem=" + curPacketBuf.remaining(); - - curPacketBuf.position(headerLen); - curPacketBuf.limit(headerLen + checksumsLen); + + // Slice the checksums. + curPacketBuf.position(lenThroughHeader); + curPacketBuf.limit(lenThroughChecksums); curChecksumSlice = curPacketBuf.slice(); - curPacketBuf.position(headerLen + checksumsLen); - curPacketBuf.limit(headerLen + checksumsLen + dataLen); + // Slice the data. + curPacketBuf.position(lenThroughChecksums); + curPacketBuf.limit(lenThroughData); curDataSlice = curPacketBuf.slice(); + // Reset buffer to point to the entirety of the packet (including + // length prefixes) curPacketBuf.position(0); - curPacketBuf.limit(headerLen + checksumsLen + dataLen); + curPacketBuf.limit(lenThroughData); } @@ -258,12 +267,21 @@ public class PacketReceiver implements C // one. if (curPacketBuf == null || curPacketBuf.capacity() < atLeastCapacity) { - returnPacketBufToPool(); + ByteBuffer newBuf; if (useDirectBuffers) { - curPacketBuf = bufferPool.getBuffer(atLeastCapacity); + newBuf = bufferPool.getBuffer(atLeastCapacity); } else { - curPacketBuf = ByteBuffer.allocate(atLeastCapacity); + newBuf = ByteBuffer.allocate(atLeastCapacity); } + // If reallocing an existing buffer, copy the old packet length + // prefixes over + if (curPacketBuf != null) { + curPacketBuf.flip(); + newBuf.put(curPacketBuf); + } + + returnPacketBufToPool(); + curPacketBuf = newBuf; } } Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Fri Oct 19 18:49:38 2012 @@ -288,7 +288,7 @@ public class BlockManager { } private static BlockTokenSecretManager createBlockTokenSecretManager( - final Configuration conf) throws IOException { + final Configuration conf) { final boolean isEnabled = conf.getBoolean( DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT); @@ -1260,7 +1260,7 @@ public class BlockManager { // Move the block-replication into a "pending" state. // The reason we use 'pending' is so we can retry // replications that fail after an appropriate amount of time. - pendingReplications.add(block, targets.length); + pendingReplications.increment(block, targets.length); if(NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug( "BLOCK* block " + block @@ -1306,8 +1306,11 @@ public class BlockManager { /** * Choose target datanodes according to the replication policy. - * @throws IOException if the number of targets < minimum replication. - * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, HashMap, long) + * + * @throws IOException + * if the number of targets < minimum replication. + * @see BlockPlacementPolicy#chooseTarget(String, int, DatanodeDescriptor, + * List, boolean, HashMap, long) */ public DatanodeDescriptor[] chooseTarget(final String src, final int numOfReplicas, final DatanodeDescriptor client, @@ -1811,7 +1814,7 @@ assert storedBlock.findDatanode(dn) < 0 /** * Queue the given reported block for later processing in the - * standby node. {@see PendingDataNodeMessages}. + * standby node. @see PendingDataNodeMessages. * @param reason a textual reason to report in the debug logs */ private void queueReportedBlock(DatanodeDescriptor dn, Block block, @@ -1976,14 +1979,15 @@ assert storedBlock.findDatanode(dn) < 0 } /** - * Faster version of {@link addStoredBlock()}, intended for use with - * initial block report at startup. If not in startup safe mode, will - * call standard addStoredBlock(). - * Assumes this method is called "immediately" so there is no need to - * refresh the storedBlock from blocksMap. - * Doesn't handle underReplication/overReplication, or worry about + * Faster version of + * {@link #addStoredBlock(BlockInfo, DatanodeDescriptor, DatanodeDescriptor, boolean)} + * , intended for use with initial block report at startup. If not in startup + * safe mode, will call standard addStoredBlock(). Assumes this method is + * called "immediately" so there is no need to refresh the storedBlock from + * blocksMap. Doesn't handle underReplication/overReplication, or worry about * pendingReplications or corruptReplicas, because it's in startup safe mode. * Doesn't log every block, because there are typically millions of them. + * * @throws IOException */ private void addStoredBlockImmediate(BlockInfo storedBlock, @@ -2505,7 +2509,7 @@ assert storedBlock.findDatanode(dn) < 0 // // Modify the blocks->datanode map and node's map. // - pendingReplications.remove(block); + pendingReplications.decrement(block); processAndHandleReportedBlock(node, block, ReplicaState.FINALIZED, delHintNode); } @@ -2641,7 +2645,7 @@ assert storedBlock.findDatanode(dn) < 0 } /** - * Simpler, faster form of {@link countNodes()} that only returns the number + * Simpler, faster form of {@link #countNodes(Block)} that only returns the number * of live nodes. If in startup safemode (or its 30-sec extension period), * then it gains speed by ignoring issues of excess replicas or nodes * that are decommissioned or in process of becoming decommissioned. @@ -2790,6 +2794,8 @@ assert storedBlock.findDatanode(dn) < 0 addToInvalidates(block); corruptReplicas.removeFromCorruptReplicasMap(block); blocksMap.removeBlock(block); + // Remove the block from pendingReplications + pendingReplications.remove(block); if (postponedMisreplicatedBlocks.remove(block)) { postponedMisreplicatedBlocksCount--; } @@ -2856,6 +2862,9 @@ assert storedBlock.findDatanode(dn) < 0 * @return number of blocks scheduled for removal during this iteration. */ private int invalidateWorkForOneNode(String nodeId) { + final List toInvalidate; + final DatanodeDescriptor dn; + namesystem.writeLock(); try { // blocks should not be replicated or removed if safe mode is on @@ -2865,10 +2874,23 @@ assert storedBlock.findDatanode(dn) < 0 } // get blocks to invalidate for the nodeId assert nodeId != null; - return invalidateBlocks.invalidateWork(nodeId); + dn = datanodeManager.getDatanode(nodeId); + if (dn == null) { + invalidateBlocks.remove(nodeId); + return 0; + } + toInvalidate = invalidateBlocks.invalidateWork(nodeId, dn); + if (toInvalidate == null) { + return 0; + } } finally { namesystem.writeUnlock(); } + if (NameNode.stateChangeLog.isInfoEnabled()) { + NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName() + + ": ask " + dn + " to delete " + toInvalidate); + } + return toInvalidate.size(); } boolean blockHasEnoughRacks(Block b) { Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Fri Oct 19 18:49:38 2012 @@ -276,11 +276,11 @@ public class DatanodeDescriptor extends } public void resetBlocks() { - this.capacity = 0; - this.remaining = 0; - this.blockPoolUsed = 0; - this.dfsUsed = 0; - this.xceiverCount = 0; + setCapacity(0); + setRemaining(0); + setBlockPoolUsed(0); + setDfsUsed(0); + setXceiverCount(0); this.blockList = null; this.invalidateBlocks.clear(); this.volumeFailures = 0; @@ -303,15 +303,15 @@ public class DatanodeDescriptor extends */ public void updateHeartbeat(long capacity, long dfsUsed, long remaining, long blockPoolUsed, int xceiverCount, int volFailures) { - this.capacity = capacity; - this.dfsUsed = dfsUsed; - this.remaining = remaining; - this.blockPoolUsed = blockPoolUsed; - this.lastUpdate = Time.now(); - this.xceiverCount = xceiverCount; + setCapacity(capacity); + setRemaining(remaining); + setBlockPoolUsed(blockPoolUsed); + setDfsUsed(dfsUsed); + setXceiverCount(xceiverCount); + setLastUpdate(Time.now()); this.volumeFailures = volFailures; this.heartbeatedSinceFailover = true; - rollBlocksScheduled(lastUpdate); + rollBlocksScheduled(getLastUpdate()); } /** Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Oct 19 18:49:38 2012 @@ -567,7 +567,7 @@ public class DatanodeManager { /** * Decommission the node if it is in exclude list. */ - private void checkDecommissioning(DatanodeDescriptor nodeReg, String ipAddr) { + private void checkDecommissioning(DatanodeDescriptor nodeReg) { // If the registered node is in exclude list, then decommission it if (inExcludedHostsList(nodeReg)) { startDecommission(nodeReg); @@ -713,7 +713,7 @@ public class DatanodeManager { // also treat the registration message as a heartbeat heartbeatManager.register(nodeS); - checkDecommissioning(nodeS, dnAddress); + checkDecommissioning(nodeS); return; } @@ -733,7 +733,7 @@ public class DatanodeManager { = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK); resolveNetworkLocation(nodeDescr); addDatanode(nodeDescr); - checkDecommissioning(nodeDescr, dnAddress); + checkDecommissioning(nodeDescr); // also treat the registration message as a heartbeat // no need to update its timestamp @@ -885,7 +885,7 @@ public class DatanodeManager { * @return Return the current number of stale DataNodes (detected by * HeartbeatManager). */ - int getNumStaleNodes() { + public int getNumStaleNodes() { return this.numStaleNodes; } Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Fri Oct 19 18:49:38 2012 @@ -134,26 +134,7 @@ class InvalidateBlocks { return new ArrayList(node2blocks.keySet()); } - /** Invalidate work for the storage. */ - int invalidateWork(final String storageId) { - final DatanodeDescriptor dn = datanodeManager.getDatanode(storageId); - if (dn == null) { - remove(storageId); - return 0; - } - final List toInvalidate = invalidateWork(storageId, dn); - if (toInvalidate == null) { - return 0; - } - - if (NameNode.stateChangeLog.isInfoEnabled()) { - NameNode.stateChangeLog.info("BLOCK* " + getClass().getSimpleName() - + ": ask " + dn + " to delete " + toInvalidate); - } - return toInvalidate.size(); - } - - private synchronized List invalidateWork( + synchronized List invalidateWork( final String storageId, final DatanodeDescriptor dn) { final LightWeightHashSet set = node2blocks.get(storageId); if (set == null) { Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java Fri Oct 19 18:49:38 2012 @@ -72,7 +72,7 @@ class PendingReplicationBlocks { /** * Add a block to the list of pending Replications */ - void add(Block block, int numReplicas) { + void increment(Block block, int numReplicas) { synchronized (pendingReplications) { PendingBlockInfo found = pendingReplications.get(block); if (found == null) { @@ -89,7 +89,7 @@ class PendingReplicationBlocks { * Decrement the number of pending replication requests * for this block. */ - void remove(Block block) { + void decrement(Block block) { synchronized (pendingReplications) { PendingBlockInfo found = pendingReplications.get(block); if (found != null) { @@ -104,6 +104,16 @@ class PendingReplicationBlocks { } } + /** + * Remove the record about the given block from pendingReplications. + * @param block The given block whose pending replication requests need to be + * removed + */ + void remove(Block block) { + synchronized (pendingReplications) { + pendingReplications.remove(block); + } + } public void clear() { synchronized (pendingReplications) { Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java Fri Oct 19 18:49:38 2012 @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.common; +import java.util.concurrent.atomic.AtomicLong; + import org.apache.hadoop.classification.InterfaceAudience; /**************************************************************** @@ -35,7 +37,7 @@ public class GenerationStamp implements */ public static final long GRANDFATHER_GENERATION_STAMP = 0; - private volatile long genstamp; + private AtomicLong genstamp = new AtomicLong(); /** * Create a new instance, initialized to FIRST_VALID_STAMP. @@ -48,35 +50,36 @@ public class GenerationStamp implements * Create a new instance, initialized to the specified value. */ GenerationStamp(long stamp) { - this.genstamp = stamp; + genstamp.set(stamp); } /** * Returns the current generation stamp */ public long getStamp() { - return this.genstamp; + return genstamp.get(); } /** * Sets the current generation stamp */ public void setStamp(long stamp) { - this.genstamp = stamp; + genstamp.set(stamp); } /** * First increments the counter and then returns the stamp */ - public synchronized long nextStamp() { - this.genstamp++; - return this.genstamp; + public long nextStamp() { + return genstamp.incrementAndGet(); } @Override // Comparable public int compareTo(GenerationStamp that) { - return this.genstamp < that.genstamp ? -1 : - this.genstamp > that.genstamp ? 1 : 0; + long stamp1 = this.genstamp.get(); + long stamp2 = that.genstamp.get(); + return stamp1 < stamp2 ? -1 : + stamp1 > stamp2 ? 1 : 0; } @Override // Object @@ -89,6 +92,7 @@ public class GenerationStamp implements @Override // Object public int hashCode() { - return (int) (genstamp^(genstamp>>>32)); + long stamp = genstamp.get(); + return (int) (stamp^(stamp>>>32)); } } Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java Fri Oct 19 18:49:38 2012 @@ -431,16 +431,16 @@ public class DirectoryScanner implements private Map getDiskReport() { // First get list of data directories final List volumes = dataset.getVolumes(); - ArrayList dirReports = - new ArrayList(volumes.size()); - + + // Use an array since the threads may return out of order and + // compilersInProgress#keySet may return out of order as well. + ScanInfoPerBlockPool[] dirReports = new ScanInfoPerBlockPool[volumes.size()]; + Map> compilersInProgress = new HashMap>(); + for (int i = 0; i < volumes.size(); i++) { - if (!isValid(dataset, volumes.get(i))) { - // volume is invalid - dirReports.add(i, null); - } else { + if (isValid(dataset, volumes.get(i))) { ReportCompiler reportCompiler = new ReportCompiler(volumes.get(i)); Future result = @@ -452,7 +452,7 @@ public class DirectoryScanner implements for (Entry> report : compilersInProgress.entrySet()) { try { - dirReports.add(report.getKey(), report.getValue().get()); + dirReports[report.getKey()] = report.getValue().get(); } catch (Exception ex) { LOG.error("Error compiling report", ex); // Propagate ex to DataBlockScanner to deal with @@ -465,7 +465,7 @@ public class DirectoryScanner implements for (int i = 0; i < volumes.size(); i++) { if (isValid(dataset, volumes.get(i))) { // volume is still valid - list.addAll(dirReports.get(i)); + list.addAll(dirReports[i]); } } Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Oct 19 18:49:38 2012 @@ -290,13 +290,18 @@ public class FSDirectory implements Clos try { newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE); } catch (IOException e) { + if(NameNode.stateChangeLog.isDebugEnabled()) { + NameNode.stateChangeLog.debug( + "DIR* FSDirectory.unprotectedAddFile: exception when add " + path + + " to the file system", e); + } return null; } return newNode; } INodeDirectory addToParent(byte[] src, INodeDirectory parentINode, - INode newNode, boolean propagateModTime) throws UnresolvedLinkException { + INode newNode, boolean propagateModTime) { // NOTE: This does not update space counts for parents INodeDirectory newParent = null; writeLock(); Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Oct 19 18:49:38 2012 @@ -1174,6 +1174,11 @@ public class FSEditLog implements LogsPu // TODO: are we sure this is OK? } } + + public void selectInputStreams(Collection streams, + long fromTxId, boolean inProgressOk) { + journalSet.selectInputStreams(streams, fromTxId, inProgressOk); + } public Collection selectInputStreams( long fromTxId, long toAtLeastTxId) throws IOException { @@ -1191,7 +1196,7 @@ public class FSEditLog implements LogsPu long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery, boolean inProgressOk) throws IOException { List streams = new ArrayList(); - journalSet.selectInputStreams(streams, fromTxId, inProgressOk); + selectInputStreams(streams, fromTxId, inProgressOk); try { checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk); Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Oct 19 18:49:38 2012 @@ -4677,6 +4677,13 @@ public class FSNamesystem implements Nam public int getNumDeadDataNodes() { return getBlockManager().getDatanodeManager().getNumDeadDataNodes(); } + + @Override // FSNamesystemMBean + @Metric({"StaleDataNodes", + "Number of datanodes marked stale due to delayed heartbeat"}) + public int getNumStaleDataNodes() { + return getBlockManager().getDatanodeManager().getNumStaleNodes(); + } /** * Sets the generation stamp for this filesystem Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java Fri Oct 19 18:49:38 2012 @@ -247,6 +247,11 @@ public class FileJournalManager implemen LOG.debug(this + ": selecting input streams starting at " + fromTxId + (inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") + "from among " + elfs.size() + " candidate file(s)"); + addStreamsToCollectionFromFiles(elfs, streams, fromTxId, inProgressOk); + } + + static void addStreamsToCollectionFromFiles(Collection elfs, + Collection streams, long fromTxId, boolean inProgressOk) { for (EditLogFile elf : elfs) { if (elf.isInProgress()) { if (!inProgressOk) { Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Fri Oct 19 18:49:38 2012 @@ -96,13 +96,6 @@ abstract class INode implements Comparab } } - protected INode() { - name = null; - parent = null; - modificationTime = 0; - accessTime = 0; - } - INode(PermissionStatus permissions, long mTime, long atime) { this.name = null; this.parent = null; Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/JournalManager.java Fri Oct 19 18:49:38 2012 @@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.na import java.io.Closeable; import java.io.IOException; -import java.util.Collection; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -56,21 +55,6 @@ public interface JournalManager extends */ void finalizeLogSegment(long firstTxId, long lastTxId) throws IOException; - /** - * Get a list of edit log input streams. The list will start with the - * stream that contains fromTxnId, and continue until the end of the journal - * being managed. - * - * @param fromTxnId the first transaction id we want to read - * @param inProgressOk whether or not in-progress streams should be returned - * - * @return a list of streams - * @throws IOException if the underlying storage has an error or is otherwise - * inaccessible - */ - void selectInputStreams(Collection streams, - long fromTxnId, boolean inProgressOk) throws IOException; - /** * Set the amount of memory that this stream should use to buffer edits */ Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/LogsPurgeable.java Fri Oct 19 18:49:38 2012 @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import java.util.Collection; /** * Interface used to abstract over classes which manage edit logs that may need @@ -33,5 +34,20 @@ interface LogsPurgeable { * @throws IOException in the event of error */ public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException; - + + /** + * Get a list of edit log input streams. The list will start with the + * stream that contains fromTxnId, and continue until the end of the journal + * being managed. + * + * @param fromTxId the first transaction id we want to read + * @param inProgressOk whether or not in-progress streams should be returned + * + * @return a list of streams + * @throws IOException if the underlying storage has an error or is otherwise + * inaccessible + */ + void selectInputStreams(Collection streams, + long fromTxId, boolean inProgressOk) throws IOException; + } Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorageRetentionManager.java Fri Oct 19 18:49:38 2012 @@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.server.na import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.TreeSet; @@ -32,6 +34,7 @@ import org.apache.hadoop.hdfs.server.nam import org.apache.hadoop.hdfs.util.MD5FileUtils; import com.google.common.base.Preconditions; +import com.google.common.collect.ComparisonChain; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -48,6 +51,7 @@ public class NNStorageRetentionManager { private final int numCheckpointsToRetain; private final long numExtraEditsToRetain; + private final int maxExtraEditsSegmentsToRetain; private static final Log LOG = LogFactory.getLog( NNStorageRetentionManager.class); private final NNStorage storage; @@ -65,6 +69,9 @@ public class NNStorageRetentionManager { this.numExtraEditsToRetain = conf.getLong( DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_DEFAULT); + this.maxExtraEditsSegmentsToRetain = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY, + DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_DEFAULT); Preconditions.checkArgument(numCheckpointsToRetain > 0, "Must retain at least one checkpoint"); Preconditions.checkArgument(numExtraEditsToRetain >= 0, @@ -94,7 +101,39 @@ public class NNStorageRetentionManager { // provide a "cushion" of older txns that we keep, which is // handy for HA, where a remote node may not have as many // new images. - long purgeLogsFrom = Math.max(0, minImageTxId + 1 - numExtraEditsToRetain); + // + // First, determine the target number of extra transactions to retain based + // on the configured amount. + long minimumRequiredTxId = minImageTxId + 1; + long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain); + + ArrayList editLogs = new ArrayList(); + purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false); + Collections.sort(editLogs, new Comparator() { + @Override + public int compare(EditLogInputStream a, EditLogInputStream b) { + return ComparisonChain.start() + .compare(a.getFirstTxId(), b.getFirstTxId()) + .compare(a.getLastTxId(), b.getLastTxId()) + .result(); + } + }); + + // Next, adjust the number of transactions to retain if doing so would mean + // keeping too many segments around. + while (editLogs.size() > maxExtraEditsSegmentsToRetain) { + purgeLogsFrom = editLogs.get(0).getFirstTxId(); + editLogs.remove(0); + } + + // Finally, ensure that we're not trying to purge any transactions that we + // actually need. + if (purgeLogsFrom > minimumRequiredTxId) { + throw new AssertionError("Should not purge more edits than required to " + + "restore: " + purgeLogsFrom + " should be <= " + + minimumRequiredTxId); + } + purgeableLogs.purgeLogsOlderThan(purgeLogsFrom); } Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Fri Oct 19 18:49:38 2012 @@ -751,6 +751,24 @@ public class SecondaryNameNode implement } } } + + @Override + public void selectInputStreams(Collection streams, + long fromTxId, boolean inProgressOk) { + Iterator iter = storage.dirIterator(); + while (iter.hasNext()) { + StorageDirectory dir = iter.next(); + List editFiles; + try { + editFiles = FileJournalManager.matchEditLogs( + dir.getCurrentDir()); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + FileJournalManager.addStreamsToCollectionFromFiles(editFiles, streams, + fromTxId, inProgressOk); + } + } } Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java Fri Oct 19 18:49:38 2012 @@ -112,4 +112,10 @@ public interface FSNamesystemMBean { * @return number of dead data nodes */ public int getNumDeadDataNodes(); + + /** + * Number of stale data nodes + * @return number of stale data nodes + */ + public int getNumStaleDataNodes(); } Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeRegistration.java Fri Oct 19 18:49:38 2012 @@ -82,9 +82,9 @@ public class DatanodeRegistration extend public String toString() { return getClass().getSimpleName() + "(" + getIpAddr() - + ", storageID=" + storageID - + ", infoPort=" + infoPort - + ", ipcPort=" + ipcPort + + ", storageID=" + getStorageID() + + ", infoPort=" + getInfoPort() + + ", ipcPort=" + getIpcPort() + ", storageInfo=" + storageInfo + ")"; } Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Fri Oct 19 18:49:38 2012 @@ -165,7 +165,7 @@ class ImageLoaderCurrent implements Imag if (LayoutVersion.supports(Feature.FSIMAGE_COMPRESSION, imageVersion)) { boolean isCompressed = in.readBoolean(); - v.visit(ImageElement.IS_COMPRESSED, imageVersion); + v.visit(ImageElement.IS_COMPRESSED, String.valueOf(isCompressed)); if (isCompressed) { String codecClassName = Text.readString(in); v.visit(ImageElement.COMPRESS_CODEC, codecClassName); Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Fri Oct 19 18:49:38 2012 @@ -358,7 +358,7 @@ dfs.blocksize - 67108864 + 134217728 The default block size for new files, in bytes. You can use the following suffix (case insensitive): @@ -660,6 +660,20 @@ edits in order to start again. Typically each edit is on the order of a few hundred bytes, so the default of 1 million edits should be on the order of hundreds of MBs or low GBs. + + NOTE: Fewer extra edits may be retained than value specified for this setting + if doing so would mean that more segments would be retained than the number + configured by dfs.namenode.max.extra.edits.segments.retained. + + + + + dfs.namenode.max.extra.edits.segments.retained + 10000 + The maximum number of extra edit log segments which should be retained + beyond what is minimally necessary for a NN restart. When used in conjunction with + dfs.namenode.num.extra.edits.retained, this configuration property serves to cap + the number of extra edits files to a reasonable value. Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMultiThreadedHflush.java Fri Oct 19 18:49:38 2012 @@ -20,45 +20,40 @@ package org.apache.hadoop.hdfs; import java.io.IOException; import java.util.ArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; -import org.apache.hadoop.hdfs.server.namenode.LeaseManager; -import org.apache.hadoop.hdfs.server.namenode.NameNode; -import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; -import org.apache.log4j.Level; +import org.apache.hadoop.metrics2.util.Quantile; +import org.apache.hadoop.metrics2.util.SampleQuantiles; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; import org.junit.Test; +import com.google.common.base.Stopwatch; + /** * This class tests hflushing concurrently from many threads. */ public class TestMultiThreadedHflush { static final int blockSize = 1024*1024; - static final int numBlocks = 10; - static final int fileSize = numBlocks * blockSize + 1; private static final int NUM_THREADS = 10; private static final int WRITE_SIZE = 517; private static final int NUM_WRITES_PER_THREAD = 1000; private byte[] toWrite = null; - - { - ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); - ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); - ((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.ALL); - ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); - ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); - ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL); - } + + private final SampleQuantiles quantiles = new SampleQuantiles( + new Quantile[] { + new Quantile(0.50, 0.050), + new Quantile(0.75, 0.025), new Quantile(0.90, 0.010), + new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) }); /* * creates a file but does not close it @@ -104,8 +99,11 @@ public class TestMultiThreadedHflush { } private void doAWrite() throws IOException { + Stopwatch sw = new Stopwatch().start(); stm.write(toWrite); stm.hflush(); + long micros = sw.elapsedTime(TimeUnit.MICROSECONDS); + quantiles.insert(micros); } } @@ -115,14 +113,28 @@ public class TestMultiThreadedHflush { * They all finish before the file is closed. */ @Test - public void testMultipleHflushers() throws Exception { + public void testMultipleHflushersRepl1() throws Exception { + doTestMultipleHflushers(1); + } + + @Test + public void testMultipleHflushersRepl3() throws Exception { + doTestMultipleHflushers(3); + } + + private void doTestMultipleHflushers(int repl) throws Exception { Configuration conf = new Configuration(); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(repl) + .build(); FileSystem fs = cluster.getFileSystem(); Path p = new Path("/multiple-hflushers.dat"); try { - doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, NUM_WRITES_PER_THREAD); + doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, + NUM_WRITES_PER_THREAD, repl); + System.out.println("Latency quantiles (in microseconds):\n" + + quantiles); } finally { fs.close(); cluster.shutdown(); @@ -200,13 +212,13 @@ public class TestMultiThreadedHflush { } public void doMultithreadedWrites( - Configuration conf, Path p, int numThreads, int bufferSize, int numWrites) - throws Exception { + Configuration conf, Path p, int numThreads, int bufferSize, int numWrites, + int replication) throws Exception { initBuffer(bufferSize); // create a new file. FileSystem fs = p.getFileSystem(conf); - FSDataOutputStream stm = createFile(fs, p, 1); + FSDataOutputStream stm = createFile(fs, p, replication); System.out.println("Created file simpleFlush.dat"); // There have been a couple issues with flushing empty buffers, so do @@ -240,20 +252,41 @@ public class TestMultiThreadedHflush { } public static void main(String args[]) throws Exception { - if (args.length != 1) { - System.err.println( - "usage: " + TestMultiThreadedHflush.class.getSimpleName() + - " "); - System.exit(1); + System.exit(ToolRunner.run(new CLIBenchmark(), args)); + } + + private static class CLIBenchmark extends Configured implements Tool { + public int run(String args[]) throws Exception { + if (args.length != 1) { + System.err.println( + "usage: " + TestMultiThreadedHflush.class.getSimpleName() + + " "); + System.err.println( + "Configurations settable by -D options:\n" + + " num.threads [default 10] - how many threads to run\n" + + " write.size [default 511] - bytes per write\n" + + " num.writes [default 50000] - how many writes to perform"); + System.exit(1); + } + TestMultiThreadedHflush test = new TestMultiThreadedHflush(); + Configuration conf = getConf(); + Path p = new Path(args[0]); + + int numThreads = conf.getInt("num.threads", 10); + int writeSize = conf.getInt("write.size", 511); + int numWrites = conf.getInt("num.writes", 50000); + int replication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, + DFSConfigKeys.DFS_REPLICATION_DEFAULT); + + Stopwatch sw = new Stopwatch().start(); + test.doMultithreadedWrites(conf, p, numThreads, writeSize, numWrites, + replication); + sw.stop(); + + System.out.println("Finished in " + sw.elapsedMillis() + "ms"); + System.out.println("Latency quantiles (in microseconds):\n" + + test.quantiles); + return 0; } - TestMultiThreadedHflush test = new TestMultiThreadedHflush(); - Configuration conf = new Configuration(); - Path p = new Path(args[0]); - long st = System.nanoTime(); - test.doMultithreadedWrites(conf, p, 10, 511, 50000); - long et = System.nanoTime(); - - System.out.println("Finished in " + ((et - st) / 1000000) + "ms"); } - } Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java Fri Oct 19 18:49:38 2012 @@ -372,7 +372,7 @@ public class TestBalancer { * Test parse method in Balancer#Cli class with threshold value out of * boundaries. */ - @Test + @Test(timeout=100000) public void testBalancerCliParseWithThresholdOutOfBoundaries() { String parameters[] = new String[] { "-threshold", "0" }; String reason = "IllegalArgumentException is expected when threshold value" @@ -394,7 +394,7 @@ public class TestBalancer { /** Test a cluster with even distribution, * then a new empty node is added to the cluster*/ - @Test + @Test(timeout=100000) public void testBalancer0() throws Exception { testBalancer0Internal(new HdfsConfiguration()); } @@ -406,7 +406,7 @@ public class TestBalancer { } /** Test unevenly distributed cluster */ - @Test + @Test(timeout=100000) public void testBalancer1() throws Exception { testBalancer1Internal(new HdfsConfiguration()); } @@ -419,7 +419,7 @@ public class TestBalancer { new String[] {RACK0, RACK1}); } - @Test + @Test(timeout=100000) public void testBalancer2() throws Exception { testBalancer2Internal(new HdfsConfiguration()); } @@ -467,8 +467,7 @@ public class TestBalancer { /** * Test parse method in Balancer#Cli class with wrong number of params */ - - @Test + @Test(timeout=100000) public void testBalancerCliParseWithWrongParams() { String parameters[] = new String[] { "-threshold" }; String reason = Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java Fri Oct 19 18:49:38 2012 @@ -191,4 +191,12 @@ public class BlockManagerTestUtil { "Must use default policy, got %s", bpp.getClass()); ((BlockPlacementPolicyDefault)bpp).setPreferLocalNode(prefer); } + + /** + * Call heartbeat check function of HeartbeatManager + * @param bm the BlockManager to manipulate + */ + public static void checkHeartbeat(BlockManager bm) { + bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck(); + } } Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingReplication.java Fri Oct 19 18:49:38 2012 @@ -20,14 +20,30 @@ package org.apache.hadoop.hdfs.server.bl import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.junit.Test; /** - * This class tests the internals of PendingReplicationBlocks.java + * This class tests the internals of PendingReplicationBlocks.java, + * as well as how PendingReplicationBlocks acts in BlockManager */ public class TestPendingReplication { final static int TIMEOUT = 3; // 3 seconds + private static final int DFS_REPLICATION_INTERVAL = 1; + // Number of datanodes in the cluster + private static final int DATANODE_COUNT = 5; @Test public void testPendingReplication() { @@ -40,7 +56,7 @@ public class TestPendingReplication { // for (int i = 0; i < 10; i++) { Block block = new Block(i, i, 0); - pendingReplications.add(block, i); + pendingReplications.increment(block, i); } assertEquals("Size of pendingReplications ", 10, pendingReplications.size()); @@ -50,15 +66,15 @@ public class TestPendingReplication { // remove one item and reinsert it // Block blk = new Block(8, 8, 0); - pendingReplications.remove(blk); // removes one replica + pendingReplications.decrement(blk); // removes one replica assertEquals("pendingReplications.getNumReplicas ", 7, pendingReplications.getNumReplicas(blk)); for (int i = 0; i < 7; i++) { - pendingReplications.remove(blk); // removes all replicas + pendingReplications.decrement(blk); // removes all replicas } assertTrue(pendingReplications.size() == 9); - pendingReplications.add(blk, 8); + pendingReplications.increment(blk, 8); assertTrue(pendingReplications.size() == 10); // @@ -86,7 +102,7 @@ public class TestPendingReplication { for (int i = 10; i < 15; i++) { Block block = new Block(i, i, 0); - pendingReplications.add(block, i); + pendingReplications.increment(block, i); } assertTrue(pendingReplications.size() == 15); @@ -116,4 +132,70 @@ public class TestPendingReplication { } pendingReplications.stop(); } + + /** + * Test if BlockManager can correctly remove corresponding pending records + * when a file is deleted + * + * @throws Exception + */ + @Test + public void testPendingAndInvalidate() throws Exception { + final Configuration CONF = new HdfsConfiguration(); + CONF.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); + CONF.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + DFS_REPLICATION_INTERVAL); + CONF.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, + DFS_REPLICATION_INTERVAL); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(CONF).numDataNodes( + DATANODE_COUNT).build(); + cluster.waitActive(); + + FSNamesystem namesystem = cluster.getNamesystem(); + BlockManager bm = namesystem.getBlockManager(); + DistributedFileSystem fs = cluster.getFileSystem(); + try { + // 1. create a file + Path filePath = new Path("/tmp.txt"); + DFSTestUtil.createFile(fs, filePath, 1024, (short) 3, 0L); + + // 2. disable the heartbeats + for (DataNode dn : cluster.getDataNodes()) { + DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true); + } + + // 3. mark a couple of blocks as corrupt + LocatedBlock block = NameNodeAdapter.getBlockLocations( + cluster.getNameNode(), filePath.toString(), 0, 1).get(0); + cluster.getNamesystem().writeLock(); + try { + bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[0], + "TEST"); + bm.findAndMarkBlockAsCorrupt(block.getBlock(), block.getLocations()[1], + "TEST"); + } finally { + cluster.getNamesystem().writeUnlock(); + } + BlockManagerTestUtil.computeAllPendingWork(bm); + BlockManagerTestUtil.updateState(bm); + assertEquals(bm.getPendingReplicationBlocksCount(), 1L); + assertEquals(bm.pendingReplications.getNumReplicas(block.getBlock() + .getLocalBlock()), 2); + + // 4. delete the file + fs.delete(filePath, true); + // retry at most 10 times, each time sleep for 1s. Note that 10s is much + // less than the default pending record timeout (5~10min) + int retries = 10; + long pendingNum = bm.getPendingReplicationBlocksCount(); + while (pendingNum != 0 && retries-- > 0) { + Thread.sleep(1000); // let NN do the deletion + BlockManagerTestUtil.updateState(bm); + pendingNum = bm.getPendingReplicationBlocksCount(); + } + assertEquals(pendingNum, 0L); + } finally { + cluster.shutdown(); + } + } } Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestUnderReplicatedBlocks.java Fri Oct 19 18:49:38 2012 @@ -30,7 +30,7 @@ import org.apache.hadoop.hdfs.protocol.E import org.junit.Test; public class TestUnderReplicatedBlocks { - @Test + @Test(timeout=300000) // 5 min timeout public void testSetrepIncWithUnderReplicatedBlocks() throws Exception { Configuration conf = new HdfsConfiguration(); final short REPLICATION_FACTOR = 2; Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java Fri Oct 19 18:49:38 2012 @@ -74,7 +74,7 @@ public class TestMultipleNNDataBlockScan } } - @Test + @Test(timeout=120000) public void testDataBlockScanner() throws IOException, InterruptedException { setUp(); try { @@ -97,7 +97,7 @@ public class TestMultipleNNDataBlockScan } } - @Test + @Test(timeout=120000) public void testBlockScannerAfterRefresh() throws IOException, InterruptedException { setUp(); @@ -149,7 +149,7 @@ public class TestMultipleNNDataBlockScan } } - @Test + @Test(timeout=120000) public void testBlockScannerAfterRestart() throws IOException, InterruptedException { setUp(); @@ -176,7 +176,7 @@ public class TestMultipleNNDataBlockScan } } - @Test + @Test(timeout=120000) public void test2NNBlockRescanInterval() throws IOException { ((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL); Configuration conf = new HdfsConfiguration(); @@ -206,7 +206,7 @@ public class TestMultipleNNDataBlockScan * * @throws Exception */ - @Test + @Test(timeout=120000) public void testBlockRescanInterval() throws IOException { ((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL); Configuration conf = new HdfsConfiguration(); Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java Fri Oct 19 18:49:38 2012 @@ -117,7 +117,7 @@ public class TestAuditLogs { int val = istream.read(); istream.close(); verifyAuditLogs(true); - assertTrue("failed to read from file", val > 0); + assertTrue("failed to read from file", val >= 0); } /** test that allowed stat puts proper entry in audit log */ @@ -168,7 +168,7 @@ public class TestAuditLogs { istream.close(); verifyAuditLogsRepeat(true, 3); - assertTrue("failed to read from file", val > 0); + assertTrue("failed to read from file", val >= 0); } /** test that stat via webhdfs puts proper entry in audit log */ Modified: hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java?rev=1400219&r1=1400218&r2=1400219&view=diff ============================================================================== --- hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java (original) +++ hadoop/common/branches/MR-3902/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNNStorageRetentionManager.java Fri Oct 19 18:49:38 2012 @@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.ser import static org.apache.hadoop.hdfs.server.namenode.NNStorage.getInProgressEditsFileName; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -196,6 +197,35 @@ public class TestNNStorageRetentionManag runTest(tc); } + @Test + public void testRetainExtraLogsLimitedSegments() throws IOException { + conf.setLong(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, + 150); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MAX_EXTRA_EDITS_SEGMENTS_RETAINED_KEY, 2); + TestCaseDescription tc = new TestCaseDescription(); + tc.addRoot("/foo1", NameNodeDirType.IMAGE); + tc.addRoot("/foo2", NameNodeDirType.EDITS); + tc.addImage("/foo1/current/" + getImageFileName(100), true); + tc.addImage("/foo1/current/" + getImageFileName(200), true); + tc.addImage("/foo1/current/" + getImageFileName(300), false); + tc.addImage("/foo1/current/" + getImageFileName(400), false); + + tc.addLog("/foo2/current/" + getFinalizedEditsFileName(1, 100), true); + // Without lowering the max segments to retain, we'd retain all segments + // going back to txid 150 (300 - 150). + tc.addLog("/foo2/current/" + getFinalizedEditsFileName(101, 175), true); + tc.addLog("/foo2/current/" + getFinalizedEditsFileName(176, 200), true); + tc.addLog("/foo2/current/" + getFinalizedEditsFileName(201, 225), true); + tc.addLog("/foo2/current/" + getFinalizedEditsFileName(226, 240), true); + // Only retain 2 extra segments. The 301-400 segment is considered required, + // not extra. + tc.addLog("/foo2/current/" + getFinalizedEditsFileName(241, 275), false); + tc.addLog("/foo2/current/" + getFinalizedEditsFileName(276, 300), false); + tc.addLog("/foo2/current/" + getFinalizedEditsFileName(301, 400), false); + tc.addLog("/foo2/current/" + getInProgressEditsFileName(401), false); + runTest(tc); + } + private void runTest(TestCaseDescription tc) throws IOException { StoragePurger mockPurger = Mockito.mock(NNStorageRetentionManager.StoragePurger.class); @@ -287,8 +317,10 @@ public class TestNNStorageRetentionManag return mockStorageForDirs(sds.toArray(new StorageDirectory[0])); } + @SuppressWarnings("unchecked") public FSEditLog mockEditLog(StoragePurger purger) { final List jms = Lists.newArrayList(); + final JournalSet journalSet = new JournalSet(0); for (FakeRoot root : dirRoots.values()) { if (!root.type.isOfType(NameNodeDirType.EDITS)) continue; @@ -297,6 +329,7 @@ public class TestNNStorageRetentionManag root.mockStorageDir(), null); fjm.purger = purger; jms.add(fjm); + journalSet.add(fjm, false); } FSEditLog mockLog = Mockito.mock(FSEditLog.class); @@ -314,6 +347,18 @@ public class TestNNStorageRetentionManag return null; } }).when(mockLog).purgeLogsOlderThan(Mockito.anyLong()); + + Mockito.doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Object[] args = invocation.getArguments(); + journalSet.selectInputStreams((Collection)args[0], + (long)((Long)args[1]), (boolean)((Boolean)args[2])); + return null; + } + }).when(mockLog).selectInputStreams(Mockito.anyCollection(), + Mockito.anyLong(), Mockito.anyBoolean()); return mockLog; } }