Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Mon Jan 14 03:44:35 2013 @@ -52,6 +52,7 @@ import org.apache.hadoop.util.Daemon; class NameNodeConnector { private static final Log LOG = Balancer.LOG; private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); + private static final int MAX_NOT_CHANGED_ITERATIONS = 5; final URI nameNodeUri; final String blockpoolID; @@ -65,6 +66,8 @@ class NameNodeConnector { private final boolean encryptDataTransfer; private boolean shouldRun; private long keyUpdaterInterval; + // used for balancer + private int notChangedIterations = 0; private BlockTokenSecretManager blockTokenSecretManager; private Daemon keyupdaterthread; // AccessKeyUpdater thread private DataEncryptionKey encryptionKey; @@ -119,6 +122,20 @@ class NameNodeConnector { } } + boolean shouldContinue(long dispatchBlockMoveBytes) { + if (dispatchBlockMoveBytes > 0) { + notChangedIterations = 0; + } else { + notChangedIterations++; + if (notChangedIterations >= MAX_NOT_CHANGED_ITERATIONS) { + System.out.println("No block has been moved for " + + notChangedIterations + " iterations. Exiting..."); + return false; + } + } + return true; + } + /** Get an access token for a block. */ Token getAccessToken(ExtendedBlock eb ) throws IOException { Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Mon Jan 14 03:44:35 2013 @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Queue; import java.util.Set; import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -107,8 +108,8 @@ public class BlockManager { private volatile long corruptReplicaBlocksCount = 0L; private volatile long underReplicatedBlocksCount = 0L; private volatile long scheduledReplicationBlocksCount = 0L; - private volatile long excessBlocksCount = 0L; - private volatile long postponedMisreplicatedBlocksCount = 0L; + private AtomicLong excessBlocksCount = new AtomicLong(0L); + private AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L); /** Used by metrics */ public long getPendingReplicationBlocksCount() { @@ -132,11 +133,11 @@ public class BlockManager { } /** Used by metrics */ public long getExcessBlocksCount() { - return excessBlocksCount; + return excessBlocksCount.get(); } /** Used by metrics */ public long getPostponedMisreplicatedBlocksCount() { - return postponedMisreplicatedBlocksCount; + return postponedMisreplicatedBlocksCount.get(); } /** Used by metrics */ public int getPendingDataNodeMessageCount() { @@ -170,29 +171,34 @@ public class BlockManager { */ private final Set postponedMisreplicatedBlocks = Sets.newHashSet(); - // - // Keeps a TreeSet for every named node. Each treeset contains - // a list of the blocks that are "extra" at that location. We'll - // eventually remove these extras. - // Mapping: StorageID -> TreeSet - // + /** + * Maps a StorageID to the set of blocks that are "extra" for this + * DataNode. We'll eventually remove these extras. + */ public final Map> excessReplicateMap = new TreeMap>(); - // - // Store set of Blocks that need to be replicated 1 or more times. - // We also store pending replication-orders. - // + /** + * Store set of Blocks that need to be replicated 1 or more times. + * We also store pending replication-orders. + */ public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks(); + @VisibleForTesting final PendingReplicationBlocks pendingReplications; /** The maximum number of replicas allowed for a block */ public final short maxReplication; - /** The maximum number of outgoing replication streams - * a given node should have at one time - */ + /** + * The maximum number of outgoing replication streams a given node should have + * at one time considering all but the highest priority replications needed. + */ int maxReplicationStreams; + /** + * The maximum number of outgoing replication streams a given node should have + * at one time. + */ + int replicationStreamsHardLimit; /** Minimum copies needed or else write is disallowed */ public final short minReplication; /** Default number of replicas */ @@ -263,9 +269,16 @@ public class BlockManager { this.minReplication = (short)minR; this.maxReplication = (short)maxR; - this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, - DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT); - this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null; + this.maxReplicationStreams = + conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT); + this.replicationStreamsHardLimit = + conf.getInt( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, + DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT); + this.shouldCheckForEnoughRacks = + conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null + ? false : true; this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf); this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf); @@ -435,7 +448,8 @@ public class BlockManager { NumberReplicas numReplicas = new NumberReplicas(); // source node returned is not used chooseSourceDatanode(block, containingNodes, - containingLiveReplicasNodes, numReplicas); + containingLiveReplicasNodes, numReplicas, + UnderReplicatedBlocks.LEVEL); assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas(); int usableReplicas = numReplicas.liveReplicas() + numReplicas.decommissionedReplicas(); @@ -1052,7 +1066,7 @@ public class BlockManager { private void postponeBlock(Block blk) { if (postponedMisreplicatedBlocks.add(blk)) { - postponedMisreplicatedBlocksCount++; + postponedMisreplicatedBlocksCount.incrementAndGet(); } } @@ -1145,11 +1159,12 @@ public class BlockManager { liveReplicaNodes = new ArrayList(); NumberReplicas numReplicas = new NumberReplicas(); srcNode = chooseSourceDatanode( - block, containingNodes, liveReplicaNodes, numReplicas); + block, containingNodes, liveReplicaNodes, numReplicas, + priority); if(srcNode == null) { // block can not be replicated from any node LOG.debug("Block " + block + " cannot be repl from any node"); continue; - } + } assert liveReplicaNodes.size() == numReplicas.liveReplicas(); // do not schedule more if enough replicas is already pending @@ -1339,16 +1354,34 @@ public class BlockManager { * since the former do not have write traffic and hence are less busy. * We do not use already decommissioned nodes as a source. * Otherwise we choose a random node among those that did not reach their - * replication limit. + * replication limits. However, if the replication is of the highest priority + * and all nodes have reached their replication limits, we will choose a + * random node despite the replication limit. * * In addition form a list of all nodes containing the block * and calculate its replication numbers. + * + * @param block Block for which a replication source is needed + * @param containingNodes List to be populated with nodes found to contain the + * given block + * @param nodesContainingLiveReplicas List to be populated with nodes found to + * contain live replicas of the given block + * @param numReplicas NumberReplicas instance to be initialized with the + * counts of live, corrupt, excess, and + * decommissioned replicas of the given + * block. + * @param priority integer representing replication priority of the given + * block + * @return the DatanodeDescriptor of the chosen node from which to replicate + * the given block */ - private DatanodeDescriptor chooseSourceDatanode( + @VisibleForTesting + DatanodeDescriptor chooseSourceDatanode( Block block, List containingNodes, List nodesContainingLiveReplicas, - NumberReplicas numReplicas) { + NumberReplicas numReplicas, + int priority) { containingNodes.clear(); nodesContainingLiveReplicas.clear(); DatanodeDescriptor srcNode = null; @@ -1377,8 +1410,15 @@ public class BlockManager { // If so, do not select the node as src node if ((nodesCorrupt != null) && nodesCorrupt.contains(node)) continue; - if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) + if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY + && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) + { continue; // already reached replication limit + } + if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit) + { + continue; + } // the block must not be scheduled for removal on srcNode if(excessBlocks != null && excessBlocks.contains(block)) continue; @@ -1558,7 +1598,7 @@ public class BlockManager { "in block map."); } it.remove(); - postponedMisreplicatedBlocksCount--; + postponedMisreplicatedBlocksCount.decrementAndGet(); continue; } MisReplicationResult res = processMisReplicatedBlock(bi); @@ -1568,7 +1608,7 @@ public class BlockManager { } if (res != MisReplicationResult.POSTPONE) { it.remove(); - postponedMisreplicatedBlocksCount--; + postponedMisreplicatedBlocksCount.decrementAndGet(); } } } @@ -2405,7 +2445,7 @@ assert storedBlock.findDatanode(dn) < 0 excessReplicateMap.put(dn.getStorageID(), excessBlocks); } if (excessBlocks.add(block)) { - excessBlocksCount++; + excessBlocksCount.incrementAndGet(); if(blockLog.isDebugEnabled()) { blockLog.debug("BLOCK* addToExcessReplicate:" + " (" + dn + ", " + block @@ -2453,7 +2493,7 @@ assert storedBlock.findDatanode(dn) < 0 .getStorageID()); if (excessBlocks != null) { if (excessBlocks.remove(block)) { - excessBlocksCount--; + excessBlocksCount.decrementAndGet(); if(blockLog.isDebugEnabled()) { blockLog.debug("BLOCK* removeStoredBlock: " + block + " is removed from excessBlocks"); @@ -2798,7 +2838,7 @@ assert storedBlock.findDatanode(dn) < 0 // Remove the block from pendingReplications pendingReplications.remove(block); if (postponedMisreplicatedBlocks.remove(block)) { - postponedMisreplicatedBlocksCount--; + postponedMisreplicatedBlocksCount.decrementAndGet(); } } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Mon Jan 14 03:44:35 2013 @@ -236,13 +236,18 @@ public class BlockPlacementPolicyDefault + totalReplicasExpected + "\n" + e.getMessage()); if (avoidStaleNodes) { - // excludedNodes now has - initial excludedNodes, any nodes that were - // chosen and nodes that were tried but were not chosen because they - // were stale, decommissioned or for any other reason a node is not - // chosen for write. Retry again now not avoiding stale node + // Retry chooseTarget again, this time not avoiding stale nodes. + + // excludedNodes contains the initial excludedNodes and nodes that were + // not chosen because they were stale, decommissioned, etc. + // We need to additionally exclude the nodes that were added to the + // result list in the successful calls to choose*() above. for (Node node : results) { oldExcludedNodes.put(node, node); } + // Set numOfReplicas, since it can get out of sync with the result list + // if the NotEnoughReplicasException was thrown in chooseRandom(). + numOfReplicas = totalReplicasExpected - results.size(); return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize, maxNodesPerRack, results, false); } @@ -542,7 +547,7 @@ public class BlockPlacementPolicyDefault if (LOG.isDebugEnabled()) { threadLocalBuilder.get().append(node.toString()).append(": ") .append("Node ").append(NodeBase.getPath(node)) - .append(" is not chosen because the node is staled "); + .append(" is not chosen because the node is stale "); } return false; } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java Mon Jan 14 03:44:35 2013 @@ -17,19 +17,18 @@ */ package org.apache.hadoop.hdfs.server.common; -import java.util.concurrent.atomic.AtomicLong; - import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.util.SequentialNumber; /**************************************************************** * A GenerationStamp is a Hadoop FS primitive, identified by a long. ****************************************************************/ @InterfaceAudience.Private -public class GenerationStamp implements Comparable { +public class GenerationStamp extends SequentialNumber { /** - * The first valid generation stamp. + * The last reserved generation stamp. */ - public static final long FIRST_VALID_STAMP = 1000L; + public static final long LAST_RESERVED_STAMP = 1000L; /** * Generation stamp of blocks that pre-date the introduction @@ -37,62 +36,10 @@ public class GenerationStamp implements */ public static final long GRANDFATHER_GENERATION_STAMP = 0; - private AtomicLong genstamp = new AtomicLong(); - /** - * Create a new instance, initialized to FIRST_VALID_STAMP. + * Create a new instance, initialized to {@link #LAST_RESERVED_STAMP}. */ public GenerationStamp() { - this(GenerationStamp.FIRST_VALID_STAMP); - } - - /** - * Create a new instance, initialized to the specified value. - */ - GenerationStamp(long stamp) { - genstamp.set(stamp); - } - - /** - * Returns the current generation stamp - */ - public long getStamp() { - return genstamp.get(); - } - - /** - * Sets the current generation stamp - */ - public void setStamp(long stamp) { - genstamp.set(stamp); - } - - /** - * First increments the counter and then returns the stamp - */ - public long nextStamp() { - return genstamp.incrementAndGet(); - } - - @Override // Comparable - public int compareTo(GenerationStamp that) { - long stamp1 = this.genstamp.get(); - long stamp2 = that.genstamp.get(); - return stamp1 < stamp2 ? -1 : - stamp1 > stamp2 ? 1 : 0; - } - - @Override // Object - public boolean equals(Object o) { - if (!(o instanceof GenerationStamp)) { - return false; - } - return compareTo((GenerationStamp)o) == 0; - } - - @Override // Object - public int hashCode() { - long stamp = genstamp.get(); - return (int) (stamp^(stamp>>>32)); + super(LAST_RESERVED_STAMP); } } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Mon Jan 14 03:44:35 2013 @@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.web.resour import org.apache.hadoop.hdfs.web.resources.DoAsParam; import org.apache.hadoop.hdfs.web.resources.UserParam; import org.apache.hadoop.http.HtmlQuoting; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; @@ -69,6 +70,8 @@ import org.apache.hadoop.security.author import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.VersionInfo; +import com.google.common.base.Charsets; + import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; @@ -178,7 +181,7 @@ public class JspHelper { s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT); } catch (IOException e) { deadNodes.add(chosenNode); - s.close(); + IOUtils.closeSocket(s); s = null; failures++; } @@ -228,7 +231,7 @@ public class JspHelper { } blockReader = null; s.close(); - out.print(HtmlQuoting.quoteHtmlChars(new String(buf))); + out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8))); } public static void addTableHeader(JspWriter out) throws IOException { @@ -382,6 +385,8 @@ public class JspHelper { int dint = d1.getVolumeFailures() - d2.getVolumeFailures(); ret = (dint < 0) ? -1 : ((dint > 0) ? 1 : 0); break; + default: + throw new IllegalArgumentException("Invalid sortField"); } return (sortOrder == SORT_ORDER_DSC) ? -ret : ret; } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Mon Jan 14 03:44:35 2013 @@ -44,6 +44,8 @@ import org.apache.hadoop.util.VersionInf import com.google.common.base.Preconditions; +import com.google.common.base.Charsets; + /** @@ -658,7 +660,7 @@ public abstract class Storage extends St FileLock res = null; try { res = file.getChannel().tryLock(); - file.write(jvmName.getBytes()); + file.write(jvmName.getBytes(Charsets.UTF_8)); LOG.info("Lock on " + lockF + " acquired by nodename " + jvmName); } catch(OverlappingFileLockException oe) { LOG.error("It appears that another namenode " + file.readLine() Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Mon Jan 14 03:44:35 2013 @@ -703,7 +703,7 @@ class BlockPoolSliceScanner { (info.lastScanType == ScanType.VERIFICATION_SCAN) ? "local" : "none"; buffer.append(String.format("%-26s : status : %-6s type : %-6s" + " scan time : " + - "%-15d %s\n", info.block, + "%-15d %s%n", info.block, (info.lastScanOk ? "ok" : "failed"), scanType, scanTime, (scanTime <= 0) ? "not yet verified" : @@ -716,21 +716,21 @@ class BlockPoolSliceScanner { double pctProgress = (totalBytesToScan == 0) ? 100 : (totalBytesToScan-bytesLeft)*100.0/totalBytesToScan; - buffer.append(String.format("\nTotal Blocks : %6d" + - "\nVerified in last hour : %6d" + - "\nVerified in last day : %6d" + - "\nVerified in last week : %6d" + - "\nVerified in last four weeks : %6d" + - "\nVerified in SCAN_PERIOD : %6d" + - "\nNot yet verified : %6d" + - "\nVerified since restart : %6d" + - "\nScans since restart : %6d" + - "\nScan errors since restart : %6d" + - "\nTransient scan errors : %6d" + - "\nCurrent scan rate limit KBps : %6d" + - "\nProgress this period : %6.0f%%" + - "\nTime left in cur period : %6.2f%%" + - "\n", + buffer.append(String.format("%nTotal Blocks : %6d" + + "%nVerified in last hour : %6d" + + "%nVerified in last day : %6d" + + "%nVerified in last week : %6d" + + "%nVerified in last four weeks : %6d" + + "%nVerified in SCAN_PERIOD : %6d" + + "%nNot yet verified : %6d" + + "%nVerified since restart : %6d" + + "%nScans since restart : %6d" + + "%nScan errors since restart : %6d" + + "%nTransient scan errors : %6d" + + "%nCurrent scan rate limit KBps : %6d" + + "%nProgress this period : %6.0f%%" + + "%nTime left in cur period : %6.2f%%" + + "%n", total, inOneHour, inOneDay, inOneWeek, inFourWeeks, inScanPeriod, neverScanned, totalScans, totalScans, Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java Mon Jan 14 03:44:35 2013 @@ -78,6 +78,10 @@ public class BlockPoolSliceStorage exten this.clusterID = clusterId; } + private BlockPoolSliceStorage() { + super(NodeType.DATA_NODE); + } + /** * Analyze storage directories. Recover from previous transitions if required. * @@ -378,7 +382,7 @@ public class BlockPoolSliceStorage exten if (!prevDir.exists()) return; // read attributes out of the VERSION file of previous directory - DataStorage prevInfo = new DataStorage(); + BlockPoolSliceStorage prevInfo = new BlockPoolSliceStorage(); prevInfo.readPreviousVersionProperties(bpSd); // We allow rollback to a state, which is either consistent with Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Mon Jan 14 03:44:35 2013 @@ -648,7 +648,7 @@ class BlockSender implements java.io.Clo ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize); - while (endOffset > offset) { + while (endOffset > offset && !Thread.currentThread().isInterrupted()) { manageOsCache(); long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, throttler); @@ -656,16 +656,19 @@ class BlockSender implements java.io.Clo totalRead += len + (numberOfChunks(len) * checksumSize); seqno++; } - try { - // send an empty packet to mark the end of the block - sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, - throttler); - out.flush(); - } catch (IOException e) { //socket error - throw ioeToSocketException(e); - } + // If this thread was interrupted, then it did not send the full block. + if (!Thread.currentThread().isInterrupted()) { + try { + // send an empty packet to mark the end of the block + sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, + throttler); + out.flush(); + } catch (IOException e) { //socket error + throw ioeToSocketException(e); + } - sentEntireByteRange = true; + sentEntireByteRange = true; + } } finally { if (clientTraceFmt != null) { final long endTime = System.nanoTime(); Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Jan 14 03:44:35 2013 @@ -98,7 +98,6 @@ import org.apache.hadoop.hdfs.protocol.D import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; @@ -115,6 +114,7 @@ import org.apache.hadoop.hdfs.protocolPB import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; @@ -970,29 +970,27 @@ public class DataNode extends Configured dnId.setStorageID(createNewStorageId(dnId.getXferPort())); } + /** + * @return a unique storage ID of form "DS-randInt-ipaddr-port-timestamp" + */ static String createNewStorageId(int port) { - /* Return - * "DS-randInt-ipaddr-currentTimeMillis" - * It is considered extermely rare for all these numbers to match - * on a different machine accidentally for the following - * a) SecureRandom(INT_MAX) is pretty much random (1 in 2 billion), and - * b) Good chance ip address would be different, and - * c) Even on the same machine, Datanode is designed to use different ports. - * d) Good chance that these are started at different times. - * For a confict to occur all the 4 above have to match!. - * The format of this string can be changed anytime in future without - * affecting its functionality. - */ + // It is unlikely that we will create a non-unique storage ID + // for the following reasons: + // a) SecureRandom is a cryptographically strong random number generator + // b) IP addresses will likely differ on different hosts + // c) DataNode xfer ports will differ on the same host + // d) StorageIDs will likely be generated at different times (in ms) + // A conflict requires that all four conditions are violated. + // NB: The format of this string can be changed in the future without + // requiring that old SotrageIDs be updated. String ip = "unknownIP"; try { ip = DNS.getDefaultIP("default"); } catch (UnknownHostException ignored) { - LOG.warn("Could not find ip address of \"default\" inteface."); + LOG.warn("Could not find an IP address for the \"default\" inteface."); } - int rand = DFSUtil.getSecureRandom().nextInt(Integer.MAX_VALUE); - return "DS-" + rand + "-" + ip + "-" + port + "-" - + Time.now(); + return "DS-" + rand + "-" + ip + "-" + port + "-" + Time.now(); } /** Ensure the authentication method is kerberos */ @@ -1468,7 +1466,7 @@ public class DataNode extends Configured // read ack if (isClient) { DNTransferAckProto closeAck = DNTransferAckProto.parseFrom( - HdfsProtoUtil.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); if (LOG.isDebugEnabled()) { LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck); } @@ -1908,10 +1906,11 @@ public class DataNode extends Configured } /** - * Get namenode corresponding to a block pool + * Get the NameNode corresponding to the given block pool. + * * @param bpid Block pool Id * @return Namenode corresponding to the bpid - * @throws IOException + * @throws IOException if unable to get the corresponding NameNode */ public DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(String bpid) throws IOException { @@ -1935,11 +1934,6 @@ public class DataNode extends Configured final String bpid = block.getBlockPoolId(); DatanodeProtocolClientSideTranslatorPB nn = getActiveNamenodeForBP(block.getBlockPoolId()); - if (nn == null) { - throw new IOException( - "Unable to synchronize block " + rBlock + ", since this DN " - + " has not acknowledged any NN as active."); - } long recoveryId = rBlock.getNewGenerationStamp(); if (LOG.isDebugEnabled()) { Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Mon Jan 14 03:44:35 2013 @@ -62,7 +62,7 @@ import org.apache.hadoop.util.DiskChecke */ @InterfaceAudience.Private public class DataStorage extends Storage { - // Constants + public final static String BLOCK_SUBDIR_PREFIX = "subdir"; final static String BLOCK_FILE_PREFIX = "blk_"; final static String COPY_FILE_PREFIX = "dncp_"; @@ -71,13 +71,13 @@ public class DataStorage extends Storage public final static String STORAGE_DIR_FINALIZED = "finalized"; public final static String STORAGE_DIR_TMP = "tmp"; - /** Access to this variable is guarded by "this" */ + /** Unique storage ID. {@see DataNode#createNewStorageId(int)} for details */ private String storageID; - // flag to ensure initialzing storage occurs only once - private boolean initilized = false; + // Flag to ensure we only initialize storage once + private boolean initialized = false; - // BlockPoolStorage is map of + // Maps block pool IDs to block pool storage private Map bpStorageMap = Collections.synchronizedMap(new HashMap()); @@ -130,7 +130,7 @@ public class DataStorage extends Storage synchronized void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo, Collection dataDirs, StartupOption startOpt) throws IOException { - if (initilized) { + if (initialized) { // DN storage has been initialized, no need to do anything return; } @@ -200,7 +200,7 @@ public class DataStorage extends Storage this.writeAll(); // 4. mark DN storage is initilized - this.initilized = true; + this.initialized = true; } /** Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Jan 14 03:44:35 2013 @@ -42,7 +42,6 @@ import org.apache.commons.logging.Log; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; @@ -56,6 +55,7 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -144,7 +144,7 @@ class DataXceiver extends Receiver imple /** Return the datanode object. */ DataNode getDataNode() {return datanode;} - private OutputStream getOutputStream() throws IOException { + private OutputStream getOutputStream() { return socketOut; } @@ -284,7 +284,7 @@ class DataXceiver extends Receiver imple // to respond with a Status enum. try { ClientReadStatusProto stat = ClientReadStatusProto.parseFrom( - HdfsProtoUtil.vintPrefixed(in)); + PBHelper.vintPrefixed(in)); if (!stat.hasStatus()) { LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " + "code after reading. Will close connection."); @@ -445,7 +445,7 @@ class DataXceiver extends Receiver imple // read connect ack (only for clients, not for replication req) if (isClient) { BlockOpResponseProto connectAck = - BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(mirrorIn)); + BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn)); mirrorInStatus = connectAck.getStatus(); firstBadLink = connectAck.getFirstBadLink(); if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) { @@ -606,7 +606,7 @@ class DataXceiver extends Receiver imple .setBytesPerCrc(bytesPerCRC) .setCrcPerBlock(crcPerBlock) .setMd5(ByteString.copyFrom(md5.getDigest())) - .setCrcType(HdfsProtoUtil.toProto(checksum.getChecksumType())) + .setCrcType(PBHelper.convert(checksum.getChecksumType())) ) .build() .writeDelimitedTo(out); @@ -765,7 +765,7 @@ class DataXceiver extends Receiver imple // receive the response from the proxy BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom( - HdfsProtoUtil.vintPrefixed(proxyReply)); + PBHelper.vintPrefixed(proxyReply)); if (copyResponse.getStatus() != SUCCESS) { if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) { Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Mon Jan 14 03:44:35 2013 @@ -79,9 +79,6 @@ public class DatanodeJspHelper { .getCanonicalHostName(); } - private static final SimpleDateFormat lsDateFormat = - new SimpleDateFormat("yyyy-MM-dd HH:mm"); - /** * Get the default chunk size. * @param conf the configuration @@ -205,8 +202,8 @@ public class DatanodeJspHelper { + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr); cols[0] = "" + HtmlQuoting.quoteHtmlChars(localFileName) + ""; - cols[5] = lsDateFormat.format(new Date((files[i] - .getModificationTime()))); + cols[5] = new SimpleDateFormat("yyyy-MM-dd HH:mm").format( + new Date((files[i].getModificationTime()))); cols[6] = files[i].getPermission().toString(); cols[7] = files[i].getOwner(); cols[8] = files[i].getGroup(); Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Mon Jan 14 03:44:35 2013 @@ -203,9 +203,6 @@ abstract public class ReplicaInfo extend throw new IOException("detachBlock:Block not found. " + this); } File meta = getMetaFile(); - if (meta == null) { - throw new IOException("Meta file not found for block " + this); - } if (HardLink.getLinkCount(file) > numLinks) { DataNode.LOG.info("CopyOnWrite for block " + this); Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java Mon Jan 14 03:44:35 2013 @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; +import java.io.FileDescriptor; +import java.io.FileInputStream; import java.io.InputStream; import org.apache.hadoop.io.IOUtils; @@ -30,9 +32,9 @@ public class ReplicaInputStreams impleme private final InputStream checksumIn; /** Create an object with a data input stream and a checksum input stream. */ - public ReplicaInputStreams(InputStream dataIn, InputStream checksumIn) { - this.dataIn = dataIn; - this.checksumIn = checksumIn; + public ReplicaInputStreams(FileDescriptor dataFd, FileDescriptor checksumFd) { + this.dataIn = new FileInputStream(dataFd); + this.checksumIn = new FileInputStream(checksumFd); } /** @return the data input stream. */ Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Mon Jan 14 03:44:35 2013 @@ -390,8 +390,7 @@ class FsDatasetImpl implements FsDataset if (ckoff > 0) { metaInFile.seek(ckoff); } - return new ReplicaInputStreams(new FileInputStream(blockInFile.getFD()), - new FileInputStream(metaInFile.getFD())); + return new ReplicaInputStreams(blockInFile.getFD(), metaInFile.getFD()); } static File moveBlockFiles(Block b, File srcfile, File destdir Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java Mon Jan 14 03:44:35 2013 @@ -19,16 +19,20 @@ package org.apache.hadoop.hdfs.server.da import java.io.BufferedReader; import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; -import java.io.FileReader; import java.io.IOException; -import java.io.PrintStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; +import com.google.common.base.Charsets; + class RollingLogsImpl implements RollingLogs { private static final String CURR_SUFFIX = ".curr"; private static final String PREV_SUFFIX = ".prev"; @@ -40,7 +44,7 @@ class RollingLogsImpl implements Rolling private final File curr; private final File prev; - private PrintStream out; //require synchronized access + private PrintWriter out; //require synchronized access private Appender appender = new Appender() { @Override @@ -82,7 +86,8 @@ class RollingLogsImpl implements Rolling RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{ curr = new File(dir, filePrefix + CURR_SUFFIX); prev = new File(dir, filePrefix + PREV_SUFFIX); - out = new PrintStream(new FileOutputStream(curr, true)); + out = new PrintWriter(new OutputStreamWriter(new FileOutputStream( + curr, true), Charsets.UTF_8)); } @Override @@ -108,7 +113,8 @@ class RollingLogsImpl implements Rolling synchronized(this) { appender.close(); final boolean renamed = curr.renameTo(prev); - out = new PrintStream(new FileOutputStream(curr, true)); + out = new PrintWriter(new OutputStreamWriter(new FileOutputStream( + curr, true), Charsets.UTF_8)); if (!renamed) { throw new IOException("Failed to rename " + curr + " to " + prev); } @@ -163,7 +169,8 @@ class RollingLogsImpl implements Rolling reader = null; } - reader = new BufferedReader(new FileReader(file)); + reader = new BufferedReader(new InputStreamReader(new FileInputStream( + file), Charsets.UTF_8)); return true; } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java Mon Jan 14 03:44:35 2013 @@ -48,6 +48,8 @@ import org.codehaus.jackson.map.ObjectMa import org.codehaus.jackson.type.TypeReference; import org.znerd.xmlenc.XMLOutputter; +import com.google.common.base.Charsets; + /** * This class generates the data that is needed to be displayed on cluster web * console. @@ -873,7 +875,7 @@ class ClusterJspHelper { URLConnection connection = url.openConnection(); BufferedReader in = new BufferedReader( new InputStreamReader( - connection.getInputStream())); + connection.getInputStream(), Charsets.UTF_8)); String inputLine; while ((inputLine = in.readLine()) != null) { out.append(inputLine); Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Mon Jan 14 03:44:35 2013 @@ -21,6 +21,8 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.Arrays; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.server.common.Storage; @@ -41,6 +43,7 @@ import org.apache.hadoop.security.UserGr * int, int, byte[]) */ class EditLogBackupOutputStream extends EditLogOutputStream { + private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class); static int DEFAULT_BUFFER_SIZE = 256; private final JournalProtocol backupNode; // RPC proxy to backup node @@ -117,6 +120,11 @@ class EditLogBackupOutputStream extends protected void flushAndSync(boolean durable) throws IOException { assert out.getLength() == 0 : "Output buffer is not empty"; + if (doubleBuf.isFlushed()) { + LOG.info("Nothing to flush"); + return; + } + int numReadyTxns = doubleBuf.countReadyTxns(); long firstTxToFlush = doubleBuf.getFirstReadyTxId(); Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Mon Jan 14 03:44:35 2013 @@ -140,10 +140,8 @@ public class EditLogFileOutputStream ext fc.close(); fc = null; } - if (fp != null) { - fp.close(); - fp = null; - } + fp.close(); + fp = null; } finally { IOUtils.cleanup(FSNamesystem.LOG, fc, fp); doubleBuf = null; Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Mon Jan 14 03:44:35 2013 @@ -81,8 +81,9 @@ import com.google.common.base.Preconditi public class FSDirectory implements Closeable { private static INodeDirectoryWithQuota createRoot(FSNamesystem namesystem) { final INodeDirectoryWithQuota r = new INodeDirectoryWithQuota( + namesystem.allocateNewInodeId(), INodeDirectory.ROOT_NAME, - namesystem.createFsOwnerPermissions(new FsPermission((short)0755))); + namesystem.createFsOwnerPermissions(new FsPermission((short) 0755))); final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(r); s.setSnapshotQuota(0); return s; @@ -262,7 +263,9 @@ public class FSDirectory implements Clos if (!mkdirs(parent.toString(), permissions, true, modTime)) { return null; } + long id = namesystem.allocateNewInodeId(); INodeFileUnderConstruction newNode = new INodeFileUnderConstruction( + id, permissions,replication, preferredBlockSize, modTime, clientName, clientMachine, clientNode); @@ -284,7 +287,8 @@ public class FSDirectory implements Clos return newNode; } - INode unprotectedAddFile( String path, + INode unprotectedAddFile( long id, + String path, PermissionStatus permissions, short replication, long modificationTime, @@ -296,13 +300,11 @@ public class FSDirectory implements Clos final INode newNode; assert hasWriteLock(); if (underConstruction) { - newNode = new INodeFileUnderConstruction( - permissions, replication, - preferredBlockSize, modificationTime, clientName, - clientMachine, null); + newNode = new INodeFileUnderConstruction(id, permissions, replication, + preferredBlockSize, modificationTime, clientName, clientMachine, null); } else { - newNode = new INodeFile(permissions, BlockInfo.EMPTY_ARRAY, replication, - modificationTime, atime, preferredBlockSize); + newNode = new INodeFile(id, permissions, BlockInfo.EMPTY_ARRAY, + replication, modificationTime, atime, preferredBlockSize); } try { @@ -399,19 +401,16 @@ public class FSDirectory implements Clos /** * Remove a block from the file. */ - boolean removeBlock(String path, INodeFileUnderConstruction fileNode, + void removeBlock(String path, INodeFileUnderConstruction fileNode, Block block) throws IOException { waitForReady(); writeLock(); try { unprotectedRemoveBlock(path, fileNode, block); - // write modified block locations to log - fsImage.getEditLog().logOpenFile(path, fileNode); } finally { writeUnlock(); } - return true; } void unprotectedRemoveBlock(String path, INodeFileUnderConstruction fileNode, @@ -1634,8 +1633,9 @@ public class FSDirectory implements Clos // create directories beginning from the first null index for(; i < inodes.length; i++) { pathbuilder.append(Path.SEPARATOR + names[i]); - unprotectedMkdir(inodesInPath, i, components[i], - (i < lastInodeIndex) ? parentPermissions : permissions, now); + unprotectedMkdir(namesystem.allocateNewInodeId(), inodesInPath, i, + components[i], (i < lastInodeIndex) ? parentPermissions + : permissions, now); if (inodes[i] == null) { return false; } @@ -1657,7 +1657,7 @@ public class FSDirectory implements Clos return true; } - INode unprotectedMkdir(String src, PermissionStatus permissions, + INode unprotectedMkdir(long inodeId, String src, PermissionStatus permissions, long timestamp) throws QuotaExceededException, UnresolvedLinkException { assert hasWriteLock(); @@ -1666,7 +1666,8 @@ public class FSDirectory implements Clos components.length, false); INode[] inodes = inodesInPath.getINodes(); final int pos = inodes.length - 1; - unprotectedMkdir(inodesInPath, pos, components[pos], permissions, timestamp); + unprotectedMkdir(inodeId, inodesInPath, pos, components[pos], permissions, + timestamp); return inodes[pos]; } @@ -1674,11 +1675,12 @@ public class FSDirectory implements Clos * The parent path to the directory is at [0, pos-1]. * All ancestors exist. Newly created one stored at index pos. */ - private void unprotectedMkdir(INodesInPath inodesInPath, int pos, - byte[] name, PermissionStatus permission, - long timestamp) throws QuotaExceededException { + private void unprotectedMkdir(long inodeId, INodesInPath inodesInPath, + int pos, byte[] name, PermissionStatus permission, long timestamp) + throws QuotaExceededException { assert hasWriteLock(); - final INodeDirectory dir = new INodeDirectory(name, permission, timestamp); + final INodeDirectory dir = new INodeDirectory(inodeId, name, permission, + timestamp); if (addChild(inodesInPath, pos, dir, true)) { inodesInPath.setINode(pos, dir); } @@ -2248,9 +2250,10 @@ public class FSDirectory implements Clos } final String userName = dirPerms.getUserName(); INodeSymlink newNode = null; + long id = namesystem.allocateNewInodeId(); writeLock(); try { - newNode = unprotectedAddSymlink(path, target, modTime, modTime, + newNode = unprotectedAddSymlink(id, path, target, modTime, modTime, new PermissionStatus(userName, null, FsPermission.getDefault())); } finally { writeUnlock(); @@ -2270,12 +2273,13 @@ public class FSDirectory implements Clos /** * Add the specified path into the namespace. Invoked from edit log processing. */ - INodeSymlink unprotectedAddSymlink(String path, String target, long mtime, - long atime, PermissionStatus perm) + INodeSymlink unprotectedAddSymlink(long id, String path, String target, + long mtime, long atime, PermissionStatus perm) throws UnresolvedLinkException, QuotaExceededException { assert hasWriteLock(); - final INodeSymlink symlink = new INodeSymlink(target, mtime, atime, perm); - return addINode(path, symlink)? symlink: null; + final INodeSymlink symlink = new INodeSymlink(id, target, mtime, atime, + perm); + return addINode(path, symlink) ? symlink : null; } /** Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Mon Jan 14 03:44:35 2013 @@ -121,11 +121,8 @@ public class FSEditLogLoader { long lastTxId = in.getLastTxId(); long numTxns = (lastTxId - expectedStartingTxId) + 1; long lastLogTime = now(); - - if (LOG.isDebugEnabled()) { - LOG.debug("edit log length: " + in.length() + ", start txid: " - + expectedStartingTxId + ", last txid: " + lastTxId); - } + long lastInodeId = fsNamesys.getLastInodeId(); + try { while (true) { try { @@ -171,7 +168,10 @@ public class FSEditLogLoader { } } try { - applyEditLogOp(op, fsDir, in.getVersion()); + long inodeId = applyEditLogOp(op, fsDir, in.getVersion()); + if (lastInodeId < inodeId) { + lastInodeId = inodeId; + } } catch (Throwable e) { LOG.error("Encountered exception on operation " + op, e); MetaRecoveryContext.editLogLoaderPrompt("Failed to " + @@ -206,6 +206,7 @@ public class FSEditLogLoader { } } } finally { + fsNamesys.resetLastInodeId(lastInodeId); if(closeOnExit) { in.close(); } @@ -224,9 +225,9 @@ public class FSEditLogLoader { } @SuppressWarnings("deprecation") - private void applyEditLogOp(FSEditLogOp op, FSDirectory fsDir, + private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir, int logVersion) throws IOException { - + long inodeId = INodeId.GRANDFATHER_INODE_ID; if (LOG.isTraceEnabled()) { LOG.trace("replaying edit log: " + op); } @@ -257,11 +258,11 @@ public class FSEditLogLoader { assert addCloseOp.blocks.length == 0; // add to the file tree - newFile = (INodeFile)fsDir.unprotectedAddFile( - addCloseOp.path, addCloseOp.permissions, - replication, addCloseOp.mtime, - addCloseOp.atime, addCloseOp.blockSize, - true, addCloseOp.clientName, addCloseOp.clientMachine); + inodeId = fsNamesys.allocateNewInodeId(); + newFile = (INodeFile) fsDir.unprotectedAddFile(inodeId, + addCloseOp.path, addCloseOp.permissions, replication, + addCloseOp.mtime, addCloseOp.atime, addCloseOp.blockSize, true, + addCloseOp.clientName, addCloseOp.clientMachine); fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path); } else { // This is OP_ADD on an existing file @@ -374,7 +375,8 @@ public class FSEditLogLoader { } case OP_MKDIR: { MkdirOp mkdirOp = (MkdirOp)op; - fsDir.unprotectedMkdir(mkdirOp.path, mkdirOp.permissions, + inodeId = fsNamesys.allocateNewInodeId(); + fsDir.unprotectedMkdir(inodeId, mkdirOp.path, mkdirOp.permissions, mkdirOp.timestamp); break; } @@ -427,9 +429,10 @@ public class FSEditLogLoader { } case OP_SYMLINK: { SymlinkOp symlinkOp = (SymlinkOp)op; - fsDir.unprotectedAddSymlink(symlinkOp.path, symlinkOp.value, - symlinkOp.mtime, symlinkOp.atime, - symlinkOp.permissionStatus); + inodeId = fsNamesys.allocateNewInodeId(); + fsDir.unprotectedAddSymlink(inodeId, symlinkOp.path, + symlinkOp.value, symlinkOp.mtime, + symlinkOp.atime, symlinkOp.permissionStatus); break; } case OP_RENAME: { @@ -489,6 +492,7 @@ public class FSEditLogLoader { default: throw new IOException("Invalid operation read " + op.opCode); } + return inodeId; } private static String formatEditLogReplayError(EditLogInputStream in, Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Mon Jan 14 03:44:35 2013 @@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.hdfs.util.Canceler; import org.apache.hadoop.hdfs.util.MD5FileUtils; import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.util.IdGenerator; import org.apache.hadoop.util.Time; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; @@ -92,6 +93,7 @@ public class FSImage implements Closeabl final private Configuration conf; protected NNStorageRetentionManager archivalManager; + protected IdGenerator blockIdGenerator; /** * Construct an FSImage @@ -137,6 +139,9 @@ public class FSImage implements Closeabl Preconditions.checkState(fileCount == 1, "FSImage.format should be called with an uninitialized namesystem, has " + fileCount + " files"); + // BlockIdGenerator is defined during formatting + // currently there is only one BlockIdGenerator + blockIdGenerator = createBlockIdGenerator(fsn); NamespaceInfo ns = NNStorage.newNamespaceInfo(); ns.clusterID = clusterId; @@ -253,6 +258,7 @@ public class FSImage implements Closeabl doRollback(); break; case REGULAR: + default: // just load the image } @@ -737,6 +743,9 @@ public class FSImage implements Closeabl FSImageFormat.Loader loader = new FSImageFormat.Loader( conf, target); loader.load(curFile); + // BlockIdGenerator is determined after loading image + // currently there is only one BlockIdGenerator + blockIdGenerator = createBlockIdGenerator(target); target.setBlockPoolId(this.getBlockPoolID()); // Check that the image digest we loaded matches up with what @@ -1165,4 +1174,12 @@ public class FSImage implements Closeabl public synchronized long getMostRecentCheckpointTxId() { return storage.getMostRecentCheckpointTxId(); } + + public long getUniqueBlockId() { + return blockIdGenerator.nextValue(); + } + + public IdGenerator createBlockIdGenerator(FSNamesystem fsn) { + return new RandomBlockIdGenerator(fsn); + } } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Mon Jan 14 03:44:35 2013 @@ -220,7 +220,8 @@ class FSImageFormat { in = compression.unwrapInputStream(fin); LOG.info("Loading image file " + curFile + " using " + compression); - + // reset INodeId. TODO: remove this after inodeId is persisted in fsimage + namesystem.resetLastInodeIdWithoutChecking(INodeId.LAST_RESERVED_ID); // load all inodes LOG.info("Number of files = " + numFiles); if (LayoutVersion.supports(Feature.FSIMAGE_NAME_OPTIMIZATION, @@ -400,6 +401,8 @@ class FSImageFormat { long blockSize = 0; int imgVersion = getLayoutVersion(); + long inodeId = namesystem.allocateNewInodeId(); + short replication = in.readShort(); replication = namesystem.getBlockManager().adjustReplication(replication); modificationTime = in.readLong(); @@ -437,7 +440,7 @@ class FSImageFormat { PermissionStatus permissions = PermissionStatus.read(in); - return INode.newINode(permissions, blocks, symlink, replication, + return INode.newINode(inodeId, permissions, blocks, symlink, replication, modificationTime, atime, nsQuota, dsQuota, blockSize); } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Mon Jan 14 03:44:35 2013 @@ -107,7 +107,9 @@ public class FSImageSerialization { int numLocs = in.readInt(); assert numLocs == 0 : "Unexpected block locations"; - return new INodeFileUnderConstruction(name, + //TODO: get inodeId from fsimage after inodeId is persisted + return new INodeFileUnderConstruction(INodeId.GRANDFATHER_INODE_ID, + name, blockReplication, modificationTime, preferredBlockSize, Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Jan 14 03:44:35 2013 @@ -78,8 +78,9 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.FileNotFoundException; -import java.io.FileWriter; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.io.StringWriter; import java.lang.management.ManagementFactory; @@ -211,6 +212,7 @@ import org.apache.hadoop.util.VersionInf import org.mortbay.util.ajax.JSON; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -383,14 +385,43 @@ public class FSNamesystem implements Nam private final boolean haEnabled; + private INodeId inodeId; + + /** + * Set the last allocated inode id when fsimage or editlog is loaded. + */ + public void resetLastInodeId(long newValue) throws IOException { + try { + inodeId.skipTo(newValue); + } catch(IllegalStateException ise) { + throw new IOException(ise); + } + } + + /** Should only be used for tests to reset to any value */ + void resetLastInodeIdWithoutChecking(long newValue) { + inodeId.setCurrentValue(newValue); + } + + /** @return the last inode ID. */ + public long getLastInodeId() { + return inodeId.getCurrentValue(); + } + + /** Allocate a new inode ID. */ + public long allocateNewInodeId() { + return inodeId.nextValue(); + } + /** * Clear all loaded data */ void clear() { dir.reset(); dtSecretManager.reset(); - generationStamp.setStamp(GenerationStamp.FIRST_VALID_STAMP); + generationStamp.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP); leaseManager.removeAllLeases(); + inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID); } @VisibleForTesting @@ -542,6 +573,8 @@ public class FSNamesystem implements Nam this.standbyShouldCheckpoint = conf.getBoolean( DFS_HA_STANDBY_CHECKPOINTS_KEY, DFS_HA_STANDBY_CHECKPOINTS_DEFAULT); + this.inodeId = new INodeId(); + // For testing purposes, allow the DT secret manager to be started regardless // of whether security is enabled. alwaysUseDelegationTokensForTests = conf.getBoolean( @@ -1067,8 +1100,8 @@ public class FSNamesystem implements Nam try { checkSuperuserPrivilege(); File file = new File(System.getProperty("hadoop.log.dir"), filename); - PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file, - true))); + PrintWriter out = new PrintWriter(new BufferedWriter( + new OutputStreamWriter(new FileOutputStream(file, true), Charsets.UTF_8))); metaSave(out); out.flush(); out.close(); @@ -2520,13 +2553,9 @@ public class FSNamesystem implements Nam private Block allocateBlock(String src, INodesInPath inodesInPath, DatanodeDescriptor targets[]) throws IOException { assert hasWriteLock(); - Block b = new Block(DFSUtil.getRandom().nextLong(), 0, 0); - while(isValidBlock(b)) { - b.setBlockId(DFSUtil.getRandom().nextLong()); - } + Block b = new Block(getFSImage().getUniqueBlockId(), 0, 0); // Increment the generation stamp for every new block. - nextGenerationStamp(); - b.setGenerationStamp(getGenerationStamp()); + b.setGenerationStamp(nextGenerationStamp()); b = dir.addBlock(src, inodesInPath, b, targets); NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". " + blockPoolId + " " + b); @@ -4321,6 +4350,8 @@ public class FSNamesystem implements Nam case SAFEMODE_ENTER: // enter safe mode enterSafeMode(false); break; + default: + LOG.error("Unexpected safe mode action"); } } return isInSafeMode(); @@ -4580,13 +4611,6 @@ public class FSNamesystem implements Nam } } - /** - * Returns whether the given block is one pointed-to by a file. - */ - private boolean isValidBlock(Block b) { - return (blockManager.getBlockCollection(b) != null); - } - PermissionStatus createFsOwnerPermissions(FsPermission permission) { return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission); } @@ -4796,14 +4820,14 @@ public class FSNamesystem implements Nam * Sets the generation stamp for this filesystem */ void setGenerationStamp(long stamp) { - generationStamp.setStamp(stamp); + generationStamp.setCurrentValue(stamp); } /** * Gets the generation stamp for this filesystem */ long getGenerationStamp() { - return generationStamp.getStamp(); + return generationStamp.getCurrentValue(); } /** @@ -4815,7 +4839,7 @@ public class FSNamesystem implements Nam throw new SafeModeException( "Cannot get next generation stamp", safeMode); } - long gs = generationStamp.nextStamp(); + final long gs = generationStamp.nextValue(); getEditLog().logGenerationStamp(gs); // NB: callers sync the log return gs; Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Mon Jan 14 03:44:35 2013 @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.util.Arrays; @@ -143,6 +144,11 @@ public abstract class INode implements C } /** + * The inode id + */ + final private long id; + + /** * The inode name is in java UTF8 encoding; * The name in HdfsFileStatus should keep the same encoding as this. * if this encoding is changed, implicitly getFileInfo and listStatus in @@ -161,8 +167,9 @@ public abstract class INode implements C private long modificationTime = 0L; private long accessTime = 0L; - private INode(byte[] name, long permission, INodeDirectory parent, + private INode(long id, byte[] name, long permission, INodeDirectory parent, long modificationTime, long accessTime) { + this.id = id; this.name = name; this.permission = permission; this.parent = parent; @@ -170,26 +177,31 @@ public abstract class INode implements C this.accessTime = accessTime; } - INode(byte[] name, PermissionStatus permissions, INodeDirectory parent, - long modificationTime, long accessTime) { - this(name, PermissionStatusFormat.toLong(permissions), parent, + INode(long id, byte[] name, PermissionStatus permissions, + INodeDirectory parent, long modificationTime, long accessTime) { + this(id, name, PermissionStatusFormat.toLong(permissions), parent, modificationTime, accessTime); } - - INode(PermissionStatus permissions, long mtime, long atime) { - this(null, permissions, null, mtime, atime); + + INode(long id, PermissionStatus permissions, long mtime, long atime) { + this(id, null, PermissionStatusFormat.toLong(permissions), null, mtime, atime); } - - protected INode(String name, PermissionStatus permissions) { - this(DFSUtil.string2Bytes(name), permissions, null, 0L, 0L); + + protected INode(long id, String name, PermissionStatus permissions) { + this(id, DFSUtil.string2Bytes(name), permissions, null, 0L, 0L); } /** @param other Other node to be copied */ INode(INode other) { - this(other.name, other.permission, other.parent, + this(other.id, other.name, other.permission, other.parent, other.modificationTime, other.accessTime); } + /** Get inode id */ + public long getId() { + return this.id; + } + /** * Create a copy of this inode for snapshot. * @@ -598,6 +610,7 @@ public abstract class INode implements C /** * Create an INode; the inode's name is not set yet * + * @param id preassigned inode id * @param permissions permissions * @param blocks blocks if a file * @param symlink symblic link if a symbolic link @@ -609,7 +622,8 @@ public abstract class INode implements C * @param preferredBlockSize block size * @return an inode */ - static INode newINode(PermissionStatus permissions, + static INode newINode(long id, + PermissionStatus permissions, BlockInfo[] blocks, String symlink, short replication, @@ -619,17 +633,17 @@ public abstract class INode implements C long dsQuota, long preferredBlockSize) { if (symlink.length() != 0) { // check if symbolic link - return new INodeSymlink(symlink, modificationTime, atime, permissions); + return new INodeSymlink(id, symlink, modificationTime, atime, permissions); } else if (blocks == null) { //not sym link and blocks null? directory! if (nsQuota >= 0 || dsQuota >= 0) { return new INodeDirectoryWithQuota( - permissions, modificationTime, nsQuota, dsQuota); + id, permissions, modificationTime, nsQuota, dsQuota); } // regular directory - return new INodeDirectory(permissions, modificationTime); + return new INodeDirectory(id, permissions, modificationTime); } // file - return new INodeFile(permissions, blocks, replication, + return new INodeFile(id, permissions, blocks, replication, modificationTime, atime, preferredBlockSize); } Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Mon Jan 14 03:44:35 2013 @@ -62,17 +62,17 @@ public class INodeDirectory extends INod private List children = null; - public INodeDirectory(String name, PermissionStatus permissions) { - super(name, permissions); + public INodeDirectory(long id, String name, PermissionStatus permissions) { + super(id, name, permissions); } - public INodeDirectory(PermissionStatus permissions, long mTime) { - super(permissions, mTime, 0); + public INodeDirectory(long id, PermissionStatus permissions, long mTime) { + super(id, permissions, mTime, 0); } - + /** constructor */ - INodeDirectory(byte[] name, PermissionStatus permissions, long mtime) { - super(name, permissions, null, mtime, 0L); + INodeDirectory(long id, byte[] name, PermissionStatus permissions, long mtime) { + super(id, name, permissions, null, mtime, 0L); } /** Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Mon Jan 14 03:44:35 2013 @@ -55,16 +55,16 @@ public class INodeDirectoryWithQuota ext } /** constructor with no quota verification */ - INodeDirectoryWithQuota(PermissionStatus permissions, long modificationTime, - long nsQuota, long dsQuota) { - super(permissions, modificationTime); + INodeDirectoryWithQuota(long id, PermissionStatus permissions, + long modificationTime, long nsQuota, long dsQuota) { + super(id, permissions, modificationTime); this.nsQuota = nsQuota; this.dsQuota = dsQuota; } /** constructor with no quota verification */ - INodeDirectoryWithQuota(String name, PermissionStatus permissions) { - super(name, permissions); + INodeDirectoryWithQuota(long id, String name, PermissionStatus permissions) { + super(id, name, permissions); } /** Get this directory's namespace quota Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1432796&r1=1432795&r2=1432796&view=diff ============================================================================== --- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original) +++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Mon Jan 14 03:44:35 2013 @@ -87,21 +87,21 @@ public class INodeFile extends INode imp private BlockInfo[] blocks; - INodeFile(PermissionStatus permissions, BlockInfo[] blklist, + INodeFile(long id, PermissionStatus permissions, BlockInfo[] blklist, short replication, long modificationTime, long atime, long preferredBlockSize) { - this(null, permissions, modificationTime, atime, blklist, replication, + this(id, null, permissions, modificationTime, atime, blklist, replication, preferredBlockSize); } - protected INodeFile(byte[] name, PermissionStatus permissions, long mtime, long atime, + protected INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime, long atime, BlockInfo[] blklist, short replication, long preferredBlockSize) { - super(name, permissions, null, mtime, atime); + super(id, name, permissions, null, mtime, atime); header = HeaderFormat.combineReplication(header, replication); header = HeaderFormat.combinePreferredBlockSize(header, preferredBlockSize); this.blocks = blklist; } - + protected INodeFile(INodeFile that) { super(that); this.header = that.header;