hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1432796 [2/4] - in /hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/contrib/bkjournal/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/...
Date Mon, 14 Jan 2013 03:44:46 GMT
Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java Mon Jan 14 03:44:35 2013
@@ -52,6 +52,7 @@ import org.apache.hadoop.util.Daemon;
 class NameNodeConnector {
   private static final Log LOG = Balancer.LOG;
   private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
+  private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
 
   final URI nameNodeUri;
   final String blockpoolID;
@@ -65,6 +66,8 @@ class NameNodeConnector {
   private final boolean encryptDataTransfer;
   private boolean shouldRun;
   private long keyUpdaterInterval;
+  // used for balancer
+  private int notChangedIterations = 0;
   private BlockTokenSecretManager blockTokenSecretManager;
   private Daemon keyupdaterthread; // AccessKeyUpdater thread
   private DataEncryptionKey encryptionKey;
@@ -119,6 +122,20 @@ class NameNodeConnector {
     }
   }
 
+  boolean shouldContinue(long dispatchBlockMoveBytes) {
+    if (dispatchBlockMoveBytes > 0) {
+      notChangedIterations = 0;
+    } else {
+      notChangedIterations++;
+      if (notChangedIterations >= MAX_NOT_CHANGED_ITERATIONS) {
+        System.out.println("No block has been moved for "
+            + notChangedIterations + " iterations. Exiting...");
+        return false;
+      }
+    }
+    return true;
+  }
+  
   /** Get an access token for a block. */
   Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
       ) throws IOException {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Mon Jan 14 03:44:35 2013
@@ -33,6 +33,7 @@ import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -107,8 +108,8 @@ public class BlockManager {
   private volatile long corruptReplicaBlocksCount = 0L;
   private volatile long underReplicatedBlocksCount = 0L;
   private volatile long scheduledReplicationBlocksCount = 0L;
-  private volatile long excessBlocksCount = 0L;
-  private volatile long postponedMisreplicatedBlocksCount = 0L;
+  private AtomicLong excessBlocksCount = new AtomicLong(0L);
+  private AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
   
   /** Used by metrics */
   public long getPendingReplicationBlocksCount() {
@@ -132,11 +133,11 @@ public class BlockManager {
   }
   /** Used by metrics */
   public long getExcessBlocksCount() {
-    return excessBlocksCount;
+    return excessBlocksCount.get();
   }
   /** Used by metrics */
   public long getPostponedMisreplicatedBlocksCount() {
-    return postponedMisreplicatedBlocksCount;
+    return postponedMisreplicatedBlocksCount.get();
   }
   /** Used by metrics */
   public int getPendingDataNodeMessageCount() {
@@ -170,29 +171,34 @@ public class BlockManager {
    */
   private final Set<Block> postponedMisreplicatedBlocks = Sets.newHashSet();
 
-  //
-  // Keeps a TreeSet for every named node. Each treeset contains
-  // a list of the blocks that are "extra" at that location. We'll
-  // eventually remove these extras.
-  // Mapping: StorageID -> TreeSet<Block>
-  //
+  /**
+   * Maps a StorageID to the set of blocks that are "extra" for this
+   * DataNode. We'll eventually remove these extras.
+   */
   public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap =
     new TreeMap<String, LightWeightLinkedSet<Block>>();
 
-  //
-  // Store set of Blocks that need to be replicated 1 or more times.
-  // We also store pending replication-orders.
-  //
+  /**
+   * Store set of Blocks that need to be replicated 1 or more times.
+   * We also store pending replication-orders.
+   */
   public final UnderReplicatedBlocks neededReplications = new UnderReplicatedBlocks();
+
   @VisibleForTesting
   final PendingReplicationBlocks pendingReplications;
 
   /** The maximum number of replicas allowed for a block */
   public final short maxReplication;
-  /** The maximum number of outgoing replication streams
-   *  a given node should have at one time 
-   */
+  /**
+   * The maximum number of outgoing replication streams a given node should have
+   * at one time considering all but the highest priority replications needed.
+    */
   int maxReplicationStreams;
+  /**
+   * The maximum number of outgoing replication streams a given node should have
+   * at one time.
+   */
+  int replicationStreamsHardLimit;
   /** Minimum copies needed or else write is disallowed */
   public final short minReplication;
   /** Default number of replicas */
@@ -263,9 +269,16 @@ public class BlockManager {
     this.minReplication = (short)minR;
     this.maxReplication = (short)maxR;
 
-    this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
-                                             DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
-    this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null;
+    this.maxReplicationStreams =
+        conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
+            DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
+    this.replicationStreamsHardLimit =
+        conf.getInt(
+            DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
+            DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
+    this.shouldCheckForEnoughRacks =
+        conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
+            ? false : true;
 
     this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
     this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
@@ -435,7 +448,8 @@ public class BlockManager {
     NumberReplicas numReplicas = new NumberReplicas();
     // source node returned is not used
     chooseSourceDatanode(block, containingNodes,
-        containingLiveReplicasNodes, numReplicas);
+        containingLiveReplicasNodes, numReplicas,
+        UnderReplicatedBlocks.LEVEL);
     assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
     int usableReplicas = numReplicas.liveReplicas() +
                          numReplicas.decommissionedReplicas();
@@ -1052,7 +1066,7 @@ public class BlockManager {
 
   private void postponeBlock(Block blk) {
     if (postponedMisreplicatedBlocks.add(blk)) {
-      postponedMisreplicatedBlocksCount++;
+      postponedMisreplicatedBlocksCount.incrementAndGet();
     }
   }
   
@@ -1145,11 +1159,12 @@ public class BlockManager {
             liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
             NumberReplicas numReplicas = new NumberReplicas();
             srcNode = chooseSourceDatanode(
-                block, containingNodes, liveReplicaNodes, numReplicas);
+                block, containingNodes, liveReplicaNodes, numReplicas,
+                priority);
             if(srcNode == null) { // block can not be replicated from any node
               LOG.debug("Block " + block + " cannot be repl from any node");
               continue;
-          }
+            }
 
             assert liveReplicaNodes.size() == numReplicas.liveReplicas();
             // do not schedule more if enough replicas is already pending
@@ -1339,16 +1354,34 @@ public class BlockManager {
    * since the former do not have write traffic and hence are less busy.
    * We do not use already decommissioned nodes as a source.
    * Otherwise we choose a random node among those that did not reach their
-   * replication limit.
+   * replication limits.  However, if the replication is of the highest priority
+   * and all nodes have reached their replication limits, we will choose a
+   * random node despite the replication limit.
    *
    * In addition form a list of all nodes containing the block
    * and calculate its replication numbers.
+   *
+   * @param block Block for which a replication source is needed
+   * @param containingNodes List to be populated with nodes found to contain the 
+   *                        given block
+   * @param nodesContainingLiveReplicas List to be populated with nodes found to
+   *                                    contain live replicas of the given block
+   * @param numReplicas NumberReplicas instance to be initialized with the 
+   *                                   counts of live, corrupt, excess, and
+   *                                   decommissioned replicas of the given
+   *                                   block.
+   * @param priority integer representing replication priority of the given
+   *                 block
+   * @return the DatanodeDescriptor of the chosen node from which to replicate
+   *         the given block
    */
-  private DatanodeDescriptor chooseSourceDatanode(
+   @VisibleForTesting
+   DatanodeDescriptor chooseSourceDatanode(
                                     Block block,
                                     List<DatanodeDescriptor> containingNodes,
                                     List<DatanodeDescriptor> nodesContainingLiveReplicas,
-                                    NumberReplicas numReplicas) {
+                                    NumberReplicas numReplicas,
+                                    int priority) {
     containingNodes.clear();
     nodesContainingLiveReplicas.clear();
     DatanodeDescriptor srcNode = null;
@@ -1377,8 +1410,15 @@ public class BlockManager {
       // If so, do not select the node as src node
       if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
         continue;
-      if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
+      if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY
+          && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
+      {
         continue; // already reached replication limit
+      }
+      if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit)
+      {
+        continue;
+      }
       // the block must not be scheduled for removal on srcNode
       if(excessBlocks != null && excessBlocks.contains(block))
         continue;
@@ -1558,7 +1598,7 @@ public class BlockManager {
               "in block map.");
         }
         it.remove();
-        postponedMisreplicatedBlocksCount--;
+        postponedMisreplicatedBlocksCount.decrementAndGet();
         continue;
       }
       MisReplicationResult res = processMisReplicatedBlock(bi);
@@ -1568,7 +1608,7 @@ public class BlockManager {
       }
       if (res != MisReplicationResult.POSTPONE) {
         it.remove();
-        postponedMisreplicatedBlocksCount--;
+        postponedMisreplicatedBlocksCount.decrementAndGet();
       }
     }
   }
@@ -2405,7 +2445,7 @@ assert storedBlock.findDatanode(dn) < 0 
       excessReplicateMap.put(dn.getStorageID(), excessBlocks);
     }
     if (excessBlocks.add(block)) {
-      excessBlocksCount++;
+      excessBlocksCount.incrementAndGet();
       if(blockLog.isDebugEnabled()) {
         blockLog.debug("BLOCK* addToExcessReplicate:"
             + " (" + dn + ", " + block
@@ -2453,7 +2493,7 @@ assert storedBlock.findDatanode(dn) < 0 
           .getStorageID());
       if (excessBlocks != null) {
         if (excessBlocks.remove(block)) {
-          excessBlocksCount--;
+          excessBlocksCount.decrementAndGet();
           if(blockLog.isDebugEnabled()) {
             blockLog.debug("BLOCK* removeStoredBlock: "
                 + block + " is removed from excessBlocks");
@@ -2798,7 +2838,7 @@ assert storedBlock.findDatanode(dn) < 0 
     // Remove the block from pendingReplications
     pendingReplications.remove(block);
     if (postponedMisreplicatedBlocks.remove(block)) {
-      postponedMisreplicatedBlocksCount--;
+      postponedMisreplicatedBlocksCount.decrementAndGet();
     }
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java Mon Jan 14 03:44:35 2013
@@ -236,13 +236,18 @@ public class BlockPlacementPolicyDefault
                + totalReplicasExpected + "\n"
                + e.getMessage());
       if (avoidStaleNodes) {
-        // excludedNodes now has - initial excludedNodes, any nodes that were
-        // chosen and nodes that were tried but were not chosen because they
-        // were stale, decommissioned or for any other reason a node is not
-        // chosen for write. Retry again now not avoiding stale node
+        // Retry chooseTarget again, this time not avoiding stale nodes.
+
+        // excludedNodes contains the initial excludedNodes and nodes that were
+        // not chosen because they were stale, decommissioned, etc.
+        // We need to additionally exclude the nodes that were added to the 
+        // result list in the successful calls to choose*() above.
         for (Node node : results) {
           oldExcludedNodes.put(node, node);
         }
+        // Set numOfReplicas, since it can get out of sync with the result list
+        // if the NotEnoughReplicasException was thrown in chooseRandom().
+        numOfReplicas = totalReplicasExpected - results.size();
         return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
             maxNodesPerRack, results, false);
       }
@@ -542,7 +547,7 @@ public class BlockPlacementPolicyDefault
         if (LOG.isDebugEnabled()) {
           threadLocalBuilder.get().append(node.toString()).append(": ")
               .append("Node ").append(NodeBase.getPath(node))
-              .append(" is not chosen because the node is staled ");
+              .append(" is not chosen because the node is stale ");
         }
         return false;
       }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/GenerationStamp.java Mon Jan 14 03:44:35 2013
@@ -17,19 +17,18 @@
  */
 package org.apache.hadoop.hdfs.server.common;
 
-import java.util.concurrent.atomic.AtomicLong;
-
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.SequentialNumber;
 
 /****************************************************************
  * A GenerationStamp is a Hadoop FS primitive, identified by a long.
  ****************************************************************/
 @InterfaceAudience.Private
-public class GenerationStamp implements Comparable<GenerationStamp> {
+public class GenerationStamp extends SequentialNumber {
   /**
-   * The first valid generation stamp.
+   * The last reserved generation stamp.
    */
-  public static final long FIRST_VALID_STAMP = 1000L;
+  public static final long LAST_RESERVED_STAMP = 1000L;
 
   /**
    * Generation stamp of blocks that pre-date the introduction
@@ -37,62 +36,10 @@ public class GenerationStamp implements 
    */
   public static final long GRANDFATHER_GENERATION_STAMP = 0;
 
-  private AtomicLong genstamp = new AtomicLong();
-
   /**
-   * Create a new instance, initialized to FIRST_VALID_STAMP.
+   * Create a new instance, initialized to {@link #LAST_RESERVED_STAMP}.
    */
   public GenerationStamp() {
-    this(GenerationStamp.FIRST_VALID_STAMP);
-  }
-
-  /**
-   * Create a new instance, initialized to the specified value.
-   */
-  GenerationStamp(long stamp) {
-    genstamp.set(stamp);
-  }
-
-  /**
-   * Returns the current generation stamp
-   */
-  public long getStamp() {
-    return genstamp.get();
-  }
-
-  /**
-   * Sets the current generation stamp
-   */
-  public void setStamp(long stamp) {
-    genstamp.set(stamp);
-  }
-
-  /**
-   * First increments the counter and then returns the stamp 
-   */
-  public long nextStamp() {
-    return genstamp.incrementAndGet();
-  }
-
-  @Override // Comparable
-  public int compareTo(GenerationStamp that) {
-    long stamp1 = this.genstamp.get();
-    long stamp2 = that.genstamp.get();
-    return stamp1 < stamp2 ? -1 :
-           stamp1 > stamp2 ? 1 : 0;
-  }
-
-  @Override // Object
-  public boolean equals(Object o) {
-    if (!(o instanceof GenerationStamp)) {
-      return false;
-    }
-    return compareTo((GenerationStamp)o) == 0;
-  }
-
-  @Override // Object
-  public int hashCode() {
-    long stamp = genstamp.get();
-    return (int) (stamp^(stamp>>>32));
+    super(LAST_RESERVED_STAMP);
   }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Mon Jan 14 03:44:35 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.DoAsParam;
 import org.apache.hadoop.hdfs.web.resources.UserParam;
 import org.apache.hadoop.http.HtmlQuoting;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
@@ -69,6 +70,8 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.VersionInfo;
 
+import com.google.common.base.Charsets;
+
 import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
 
@@ -178,7 +181,7 @@ public class JspHelper {
         s.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
       } catch (IOException e) {
         deadNodes.add(chosenNode);
-        s.close();
+        IOUtils.closeSocket(s);
         s = null;
         failures++;
       }
@@ -228,7 +231,7 @@ public class JspHelper {
     }
     blockReader = null;
     s.close();
-    out.print(HtmlQuoting.quoteHtmlChars(new String(buf)));
+    out.print(HtmlQuoting.quoteHtmlChars(new String(buf, Charsets.UTF_8)));
   }
 
   public static void addTableHeader(JspWriter out) throws IOException {
@@ -382,6 +385,8 @@ public class JspHelper {
           int dint = d1.getVolumeFailures() - d2.getVolumeFailures();
           ret = (dint < 0) ? -1 : ((dint > 0) ? 1 : 0);
           break;
+        default:
+          throw new IllegalArgumentException("Invalid sortField");
         }
         return (sortOrder == SORT_ORDER_DSC) ? -ret : ret;
       }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Mon Jan 14 03:44:35 2013
@@ -44,6 +44,8 @@ import org.apache.hadoop.util.VersionInf
 
 import com.google.common.base.Preconditions;
 
+import com.google.common.base.Charsets;
+
 
 
 /**
@@ -658,7 +660,7 @@ public abstract class Storage extends St
       FileLock res = null;
       try {
         res = file.getChannel().tryLock();
-        file.write(jvmName.getBytes());
+        file.write(jvmName.getBytes(Charsets.UTF_8));
         LOG.info("Lock on " + lockF + " acquired by nodename " + jvmName);
       } catch(OverlappingFileLockException oe) {
         LOG.error("It appears that another namenode " + file.readLine() 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Mon Jan 14 03:44:35 2013
@@ -703,7 +703,7 @@ class BlockPoolSliceScanner {
           (info.lastScanType == ScanType.VERIFICATION_SCAN) ? "local" : "none"; 
         buffer.append(String.format("%-26s : status : %-6s type : %-6s" +
                                     " scan time : " +
-                                    "%-15d %s\n", info.block, 
+                                    "%-15d %s%n", info.block, 
                                     (info.lastScanOk ? "ok" : "failed"),
                                     scanType, scanTime,
                                     (scanTime <= 0) ? "not yet verified" : 
@@ -716,21 +716,21 @@ class BlockPoolSliceScanner {
     double pctProgress = (totalBytesToScan == 0) ? 100 :
                          (totalBytesToScan-bytesLeft)*100.0/totalBytesToScan;
                          
-    buffer.append(String.format("\nTotal Blocks                 : %6d" +
-                                "\nVerified in last hour        : %6d" +
-                                "\nVerified in last day         : %6d" +
-                                "\nVerified in last week        : %6d" +
-                                "\nVerified in last four weeks  : %6d" +
-                                "\nVerified in SCAN_PERIOD      : %6d" +
-                                "\nNot yet verified             : %6d" +
-                                "\nVerified since restart       : %6d" +
-                                "\nScans since restart          : %6d" +
-                                "\nScan errors since restart    : %6d" +
-                                "\nTransient scan errors        : %6d" +
-                                "\nCurrent scan rate limit KBps : %6d" +
-                                "\nProgress this period         : %6.0f%%" +
-                                "\nTime left in cur period      : %6.2f%%" +
-                                "\n", 
+    buffer.append(String.format("%nTotal Blocks                 : %6d" +
+                                "%nVerified in last hour        : %6d" +
+                                "%nVerified in last day         : %6d" +
+                                "%nVerified in last week        : %6d" +
+                                "%nVerified in last four weeks  : %6d" +
+                                "%nVerified in SCAN_PERIOD      : %6d" +
+                                "%nNot yet verified             : %6d" +
+                                "%nVerified since restart       : %6d" +
+                                "%nScans since restart          : %6d" +
+                                "%nScan errors since restart    : %6d" +
+                                "%nTransient scan errors        : %6d" +
+                                "%nCurrent scan rate limit KBps : %6d" +
+                                "%nProgress this period         : %6.0f%%" +
+                                "%nTime left in cur period      : %6.2f%%" +
+                                "%n", 
                                 total, inOneHour, inOneDay, inOneWeek,
                                 inFourWeeks, inScanPeriod, neverScanned,
                                 totalScans, totalScans, 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java Mon Jan 14 03:44:35 2013
@@ -78,6 +78,10 @@ public class BlockPoolSliceStorage exten
     this.clusterID = clusterId;
   }
 
+  private BlockPoolSliceStorage() {
+    super(NodeType.DATA_NODE);
+  }
+
   /**
    * Analyze storage directories. Recover from previous transitions if required.
    * 
@@ -378,7 +382,7 @@ public class BlockPoolSliceStorage exten
     if (!prevDir.exists())
       return;
     // read attributes out of the VERSION file of previous directory
-    DataStorage prevInfo = new DataStorage();
+    BlockPoolSliceStorage prevInfo = new BlockPoolSliceStorage();
     prevInfo.readPreviousVersionProperties(bpSd);
 
     // We allow rollback to a state, which is either consistent with

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Mon Jan 14 03:44:35 2013
@@ -648,7 +648,7 @@ class BlockSender implements java.io.Clo
 
       ByteBuffer pktBuf = ByteBuffer.allocate(pktBufSize);
 
-      while (endOffset > offset) {
+      while (endOffset > offset && !Thread.currentThread().isInterrupted()) {
         manageOsCache();
         long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
             transferTo, throttler);
@@ -656,16 +656,19 @@ class BlockSender implements java.io.Clo
         totalRead += len + (numberOfChunks(len) * checksumSize);
         seqno++;
       }
-      try {
-        // send an empty packet to mark the end of the block
-        sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
-            throttler);
-        out.flush();
-      } catch (IOException e) { //socket error
-        throw ioeToSocketException(e);
-      }
+      // If this thread was interrupted, then it did not send the full block.
+      if (!Thread.currentThread().isInterrupted()) {
+        try {
+          // send an empty packet to mark the end of the block
+          sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo,
+              throttler);
+          out.flush();
+        } catch (IOException e) { //socket error
+          throw ioeToSocketException(e);
+        }
 
-      sentEntireByteRange = true;
+        sentEntireByteRange = true;
+      }
     } finally {
       if (clientTraceFmt != null) {
         final long endTime = System.nanoTime();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Jan 14 03:44:35 2013
@@ -98,7 +98,6 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
@@ -115,6 +114,7 @@ import org.apache.hadoop.hdfs.protocolPB
 import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
 import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -970,29 +970,27 @@ public class DataNode extends Configured
     dnId.setStorageID(createNewStorageId(dnId.getXferPort()));
   }
   
+  /**
+   * @return a unique storage ID of form "DS-randInt-ipaddr-port-timestamp"
+   */
   static String createNewStorageId(int port) {
-    /* Return 
-     * "DS-randInt-ipaddr-currentTimeMillis"
-     * It is considered extermely rare for all these numbers to match
-     * on a different machine accidentally for the following 
-     * a) SecureRandom(INT_MAX) is pretty much random (1 in 2 billion), and
-     * b) Good chance ip address would be different, and
-     * c) Even on the same machine, Datanode is designed to use different ports.
-     * d) Good chance that these are started at different times.
-     * For a confict to occur all the 4 above have to match!.
-     * The format of this string can be changed anytime in future without
-     * affecting its functionality.
-     */
+    // It is unlikely that we will create a non-unique storage ID
+    // for the following reasons:
+    // a) SecureRandom is a cryptographically strong random number generator
+    // b) IP addresses will likely differ on different hosts
+    // c) DataNode xfer ports will differ on the same host
+    // d) StorageIDs will likely be generated at different times (in ms)
+    // A conflict requires that all four conditions are violated.
+    // NB: The format of this string can be changed in the future without
+    // requiring that old SotrageIDs be updated.
     String ip = "unknownIP";
     try {
       ip = DNS.getDefaultIP("default");
     } catch (UnknownHostException ignored) {
-      LOG.warn("Could not find ip address of \"default\" inteface.");
+      LOG.warn("Could not find an IP address for the \"default\" inteface.");
     }
-    
     int rand = DFSUtil.getSecureRandom().nextInt(Integer.MAX_VALUE);
-    return "DS-" + rand + "-" + ip + "-" + port + "-"
-        + Time.now();
+    return "DS-" + rand + "-" + ip + "-" + port + "-" + Time.now();
   }
   
   /** Ensure the authentication method is kerberos */
@@ -1468,7 +1466,7 @@ public class DataNode extends Configured
         // read ack
         if (isClient) {
           DNTransferAckProto closeAck = DNTransferAckProto.parseFrom(
-              HdfsProtoUtil.vintPrefixed(in));
+              PBHelper.vintPrefixed(in));
           if (LOG.isDebugEnabled()) {
             LOG.debug(getClass().getSimpleName() + ": close-ack=" + closeAck);
           }
@@ -1908,10 +1906,11 @@ public class DataNode extends Configured
   }
 
   /**
-   * Get namenode corresponding to a block pool
+   * Get the NameNode corresponding to the given block pool.
+   *
    * @param bpid Block pool Id
    * @return Namenode corresponding to the bpid
-   * @throws IOException
+   * @throws IOException if unable to get the corresponding NameNode
    */
   public DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(String bpid)
       throws IOException {
@@ -1935,11 +1934,6 @@ public class DataNode extends Configured
     final String bpid = block.getBlockPoolId();
     DatanodeProtocolClientSideTranslatorPB nn =
       getActiveNamenodeForBP(block.getBlockPoolId());
-    if (nn == null) {
-      throw new IOException(
-          "Unable to synchronize block " + rBlock + ", since this DN "
-          + " has not acknowledged any NN as active.");
-    }
     
     long recoveryId = rBlock.getNewGenerationStamp();
     if (LOG.isDebugEnabled()) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Mon Jan 14 03:44:35 2013
@@ -62,7 +62,7 @@ import org.apache.hadoop.util.DiskChecke
  */
 @InterfaceAudience.Private
 public class DataStorage extends Storage {
-  // Constants
+
   public final static String BLOCK_SUBDIR_PREFIX = "subdir";
   final static String BLOCK_FILE_PREFIX = "blk_";
   final static String COPY_FILE_PREFIX = "dncp_";
@@ -71,13 +71,13 @@ public class DataStorage extends Storage
   public final static String STORAGE_DIR_FINALIZED = "finalized";
   public final static String STORAGE_DIR_TMP = "tmp";
 
-  /** Access to this variable is guarded by "this" */
+  /** Unique storage ID. {@see DataNode#createNewStorageId(int)} for details */
   private String storageID;
 
-  // flag to ensure initialzing storage occurs only once
-  private boolean initilized = false;
+  // Flag to ensure we only initialize storage once
+  private boolean initialized = false;
   
-  // BlockPoolStorage is map of <Block pool Id, BlockPoolStorage>
+  // Maps block pool IDs to block pool storage
   private Map<String, BlockPoolSliceStorage> bpStorageMap
       = Collections.synchronizedMap(new HashMap<String, BlockPoolSliceStorage>());
 
@@ -130,7 +130,7 @@ public class DataStorage extends Storage
   synchronized void recoverTransitionRead(DataNode datanode,
       NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt)
       throws IOException {
-    if (initilized) {
+    if (initialized) {
       // DN storage has been initialized, no need to do anything
       return;
     }
@@ -200,7 +200,7 @@ public class DataStorage extends Storage
     this.writeAll();
     
     // 4. mark DN storage is initilized
-    this.initilized = true;
+    this.initialized = true;
   }
 
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Jan 14 03:44:35 2013
@@ -42,7 +42,6 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor.InvalidMagicNumberException;
 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
@@ -56,6 +55,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolPB.PBHelper;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -144,7 +144,7 @@ class DataXceiver extends Receiver imple
   /** Return the datanode object. */
   DataNode getDataNode() {return datanode;}
   
-  private OutputStream getOutputStream() throws IOException {
+  private OutputStream getOutputStream() {
     return socketOut;
   }
 
@@ -284,7 +284,7 @@ class DataXceiver extends Receiver imple
         // to respond with a Status enum.
         try {
           ClientReadStatusProto stat = ClientReadStatusProto.parseFrom(
-              HdfsProtoUtil.vintPrefixed(in));
+              PBHelper.vintPrefixed(in));
           if (!stat.hasStatus()) {
             LOG.warn("Client " + s.getInetAddress() + " did not send a valid status " +
                      "code after reading. Will close connection.");
@@ -445,7 +445,7 @@ class DataXceiver extends Receiver imple
           // read connect ack (only for clients, not for replication req)
           if (isClient) {
             BlockOpResponseProto connectAck =
-              BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(mirrorIn));
+              BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(mirrorIn));
             mirrorInStatus = connectAck.getStatus();
             firstBadLink = connectAck.getFirstBadLink();
             if (LOG.isDebugEnabled() || mirrorInStatus != SUCCESS) {
@@ -606,7 +606,7 @@ class DataXceiver extends Receiver imple
           .setBytesPerCrc(bytesPerCRC)
           .setCrcPerBlock(crcPerBlock)
           .setMd5(ByteString.copyFrom(md5.getDigest()))
-          .setCrcType(HdfsProtoUtil.toProto(checksum.getChecksumType()))
+          .setCrcType(PBHelper.convert(checksum.getChecksumType()))
           )
         .build()
         .writeDelimitedTo(out);
@@ -765,7 +765,7 @@ class DataXceiver extends Receiver imple
       // receive the response from the proxy
       
       BlockOpResponseProto copyResponse = BlockOpResponseProto.parseFrom(
-          HdfsProtoUtil.vintPrefixed(proxyReply));
+          PBHelper.vintPrefixed(proxyReply));
 
       if (copyResponse.getStatus() != SUCCESS) {
         if (copyResponse.getStatus() == ERROR_ACCESS_TOKEN) {

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java Mon Jan 14 03:44:35 2013
@@ -79,9 +79,6 @@ public class DatanodeJspHelper {
            .getCanonicalHostName();
   }
 
-  private static final SimpleDateFormat lsDateFormat =
-    new SimpleDateFormat("yyyy-MM-dd HH:mm");
-
   /**
    * Get the default chunk size.
    * @param conf the configuration
@@ -205,8 +202,8 @@ public class DatanodeJspHelper {
               + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnAddr);
             cols[0] = "<a href=\"" + datanodeUrl + "\">"
               + HtmlQuoting.quoteHtmlChars(localFileName) + "</a>";
-            cols[5] = lsDateFormat.format(new Date((files[i]
-              .getModificationTime())));
+            cols[5] = new SimpleDateFormat("yyyy-MM-dd HH:mm").format(
+                new Date((files[i].getModificationTime())));
             cols[6] = files[i].getPermission().toString();
             cols[7] = files[i].getOwner();
             cols[8] = files[i].getGroup();

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java Mon Jan 14 03:44:35 2013
@@ -203,9 +203,6 @@ abstract public class ReplicaInfo extend
       throw new IOException("detachBlock:Block not found. " + this);
     }
     File meta = getMetaFile();
-    if (meta == null) {
-      throw new IOException("Meta file not found for block " + this);
-    }
 
     if (HardLink.getLinkCount(file) > numLinks) {
       DataNode.LOG.info("CopyOnWrite for block " + this);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java Mon Jan 14 03:44:35 2013
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset;
 
 import java.io.Closeable;
+import java.io.FileDescriptor;
+import java.io.FileInputStream;
 import java.io.InputStream;
 
 import org.apache.hadoop.io.IOUtils;
@@ -30,9 +32,9 @@ public class ReplicaInputStreams impleme
   private final InputStream checksumIn;
 
   /** Create an object with a data input stream and a checksum input stream. */
-  public ReplicaInputStreams(InputStream dataIn, InputStream checksumIn) {
-    this.dataIn = dataIn;
-    this.checksumIn = checksumIn;
+  public ReplicaInputStreams(FileDescriptor dataFd, FileDescriptor checksumFd) {
+    this.dataIn = new FileInputStream(dataFd);
+    this.checksumIn = new FileInputStream(checksumFd);
   }
 
   /** @return the data input stream. */

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Mon Jan 14 03:44:35 2013
@@ -390,8 +390,7 @@ class FsDatasetImpl implements FsDataset
     if (ckoff > 0) {
       metaInFile.seek(ckoff);
     }
-    return new ReplicaInputStreams(new FileInputStream(blockInFile.getFD()),
-                                new FileInputStream(metaInFile.getFD()));
+    return new ReplicaInputStreams(blockInFile.getFD(), metaInFile.getFD());
   }
 
   static File moveBlockFiles(Block b, File srcfile, File destdir

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java Mon Jan 14 03:44:35 2013
@@ -19,16 +19,20 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.BufferedReader;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
-import java.io.FileReader;
 import java.io.IOException;
-import java.io.PrintStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
 
+import com.google.common.base.Charsets;
+
 class RollingLogsImpl implements RollingLogs {
   private static final String CURR_SUFFIX = ".curr";
   private static final String PREV_SUFFIX = ".prev";
@@ -40,7 +44,7 @@ class RollingLogsImpl implements Rolling
 
   private final File curr;
   private final File prev;
-  private PrintStream out; //require synchronized access
+  private PrintWriter out; //require synchronized access
 
   private Appender appender = new Appender() {
     @Override
@@ -82,7 +86,8 @@ class RollingLogsImpl implements Rolling
   RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{
     curr = new File(dir, filePrefix + CURR_SUFFIX);
     prev = new File(dir, filePrefix + PREV_SUFFIX);
-    out = new PrintStream(new FileOutputStream(curr, true));
+    out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(
+        curr, true), Charsets.UTF_8));
   }
 
   @Override
@@ -108,7 +113,8 @@ class RollingLogsImpl implements Rolling
     synchronized(this) {
       appender.close();
       final boolean renamed = curr.renameTo(prev);
-      out = new PrintStream(new FileOutputStream(curr, true));
+      out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(
+          curr, true), Charsets.UTF_8));
       if (!renamed) {
         throw new IOException("Failed to rename " + curr + " to " + prev);
       }
@@ -163,7 +169,8 @@ class RollingLogsImpl implements Rolling
         reader = null;
       }
       
-      reader = new BufferedReader(new FileReader(file));
+      reader = new BufferedReader(new InputStreamReader(new FileInputStream(
+          file), Charsets.UTF_8));
       return true;
     }
     

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ClusterJspHelper.java Mon Jan 14 03:44:35 2013
@@ -48,6 +48,8 @@ import org.codehaus.jackson.map.ObjectMa
 import org.codehaus.jackson.type.TypeReference;
 import org.znerd.xmlenc.XMLOutputter;
 
+import com.google.common.base.Charsets;
+
 /**
  * This class generates the data that is needed to be displayed on cluster web 
  * console.
@@ -873,7 +875,7 @@ class ClusterJspHelper {
     URLConnection connection = url.openConnection();
     BufferedReader in = new BufferedReader(
                             new InputStreamReader(
-                            connection.getInputStream()));
+                            connection.getInputStream(), Charsets.UTF_8));
     String inputLine;
     while ((inputLine = in.readLine()) != null) {
       out.append(inputLine);

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java Mon Jan 14 03:44:35 2013
@@ -21,6 +21,8 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.NameNodeProxies;
 import org.apache.hadoop.hdfs.server.common.Storage;
@@ -41,6 +43,7 @@ import org.apache.hadoop.security.UserGr
  *  int, int, byte[])
  */
 class EditLogBackupOutputStream extends EditLogOutputStream {
+  private static Log LOG = LogFactory.getLog(EditLogFileOutputStream.class);
   static int DEFAULT_BUFFER_SIZE = 256;
 
   private final JournalProtocol backupNode;  // RPC proxy to backup node
@@ -117,6 +120,11 @@ class EditLogBackupOutputStream extends 
   protected void flushAndSync(boolean durable) throws IOException {
     assert out.getLength() == 0 : "Output buffer is not empty";
     
+    if (doubleBuf.isFlushed()) {
+      LOG.info("Nothing to flush");
+      return;
+    }
+
     int numReadyTxns = doubleBuf.countReadyTxns();
     long firstTxToFlush = doubleBuf.getFirstReadyTxId();
     

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java Mon Jan 14 03:44:35 2013
@@ -140,10 +140,8 @@ public class EditLogFileOutputStream ext
         fc.close();
         fc = null;
       }
-      if (fp != null) {
-        fp.close();
-        fp = null;
-      }
+      fp.close();
+      fp = null;
     } finally {
       IOUtils.cleanup(FSNamesystem.LOG, fc, fp);
       doubleBuf = null;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Mon Jan 14 03:44:35 2013
@@ -81,8 +81,9 @@ import com.google.common.base.Preconditi
 public class FSDirectory implements Closeable {
   private static INodeDirectoryWithQuota createRoot(FSNamesystem namesystem) {
     final INodeDirectoryWithQuota r = new INodeDirectoryWithQuota(
+        namesystem.allocateNewInodeId(),
         INodeDirectory.ROOT_NAME,
-        namesystem.createFsOwnerPermissions(new FsPermission((short)0755)));
+        namesystem.createFsOwnerPermissions(new FsPermission((short) 0755)));
     final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(r);
     s.setSnapshotQuota(0);
     return s;
@@ -262,7 +263,9 @@ public class FSDirectory implements Clos
     if (!mkdirs(parent.toString(), permissions, true, modTime)) {
       return null;
     }
+    long id = namesystem.allocateNewInodeId();
     INodeFileUnderConstruction newNode = new INodeFileUnderConstruction(
+                                 id,
                                  permissions,replication,
                                  preferredBlockSize, modTime, clientName, 
                                  clientMachine, clientNode);
@@ -284,7 +287,8 @@ public class FSDirectory implements Clos
     return newNode;
   }
 
-  INode unprotectedAddFile( String path, 
+  INode unprotectedAddFile( long id,
+                            String path, 
                             PermissionStatus permissions,
                             short replication,
                             long modificationTime,
@@ -296,13 +300,11 @@ public class FSDirectory implements Clos
     final INode newNode;
     assert hasWriteLock();
     if (underConstruction) {
-      newNode = new INodeFileUnderConstruction(
-          permissions, replication,
-          preferredBlockSize, modificationTime, clientName, 
-          clientMachine, null);
+      newNode = new INodeFileUnderConstruction(id, permissions, replication,
+          preferredBlockSize, modificationTime, clientName, clientMachine, null);
     } else {
-      newNode = new INodeFile(permissions, BlockInfo.EMPTY_ARRAY, replication,
-                              modificationTime, atime, preferredBlockSize);
+      newNode = new INodeFile(id, permissions, BlockInfo.EMPTY_ARRAY,
+          replication, modificationTime, atime, preferredBlockSize);
     }
 
     try {
@@ -399,19 +401,16 @@ public class FSDirectory implements Clos
   /**
    * Remove a block from the file.
    */
-  boolean removeBlock(String path, INodeFileUnderConstruction fileNode, 
+  void removeBlock(String path, INodeFileUnderConstruction fileNode,
                       Block block) throws IOException {
     waitForReady();
 
     writeLock();
     try {
       unprotectedRemoveBlock(path, fileNode, block);
-      // write modified block locations to log
-      fsImage.getEditLog().logOpenFile(path, fileNode);
     } finally {
       writeUnlock();
     }
-    return true;
   }
   
   void unprotectedRemoveBlock(String path, INodeFileUnderConstruction fileNode, 
@@ -1634,8 +1633,9 @@ public class FSDirectory implements Clos
       // create directories beginning from the first null index
       for(; i < inodes.length; i++) {
         pathbuilder.append(Path.SEPARATOR + names[i]);
-        unprotectedMkdir(inodesInPath, i, components[i],
-            (i < lastInodeIndex) ? parentPermissions : permissions, now);
+        unprotectedMkdir(namesystem.allocateNewInodeId(), inodesInPath, i,
+            components[i], (i < lastInodeIndex) ? parentPermissions
+                : permissions, now);
         if (inodes[i] == null) {
           return false;
         }
@@ -1657,7 +1657,7 @@ public class FSDirectory implements Clos
     return true;
   }
 
-  INode unprotectedMkdir(String src, PermissionStatus permissions,
+  INode unprotectedMkdir(long inodeId, String src, PermissionStatus permissions,
                           long timestamp) throws QuotaExceededException,
                           UnresolvedLinkException {
     assert hasWriteLock();
@@ -1666,7 +1666,8 @@ public class FSDirectory implements Clos
         components.length, false);
     INode[] inodes = inodesInPath.getINodes();
     final int pos = inodes.length - 1;
-    unprotectedMkdir(inodesInPath, pos, components[pos], permissions, timestamp);
+    unprotectedMkdir(inodeId, inodesInPath, pos, components[pos], permissions,
+        timestamp);
     return inodes[pos];
   }
 
@@ -1674,11 +1675,12 @@ public class FSDirectory implements Clos
    * The parent path to the directory is at [0, pos-1].
    * All ancestors exist. Newly created one stored at index pos.
    */
-  private void unprotectedMkdir(INodesInPath inodesInPath, int pos,
-      byte[] name, PermissionStatus permission,
-      long timestamp) throws QuotaExceededException {
+  private void unprotectedMkdir(long inodeId, INodesInPath inodesInPath,
+      int pos, byte[] name, PermissionStatus permission, long timestamp)
+      throws QuotaExceededException {
     assert hasWriteLock();
-    final INodeDirectory dir = new INodeDirectory(name, permission, timestamp);
+    final INodeDirectory dir = new INodeDirectory(inodeId, name, permission,
+        timestamp);
     if (addChild(inodesInPath, pos, dir, true)) {
       inodesInPath.setINode(pos, dir);
     }
@@ -2248,9 +2250,10 @@ public class FSDirectory implements Clos
     }
     final String userName = dirPerms.getUserName();
     INodeSymlink newNode  = null;
+    long id = namesystem.allocateNewInodeId();
     writeLock();
     try {
-      newNode = unprotectedAddSymlink(path, target, modTime, modTime,
+      newNode = unprotectedAddSymlink(id, path, target, modTime, modTime,
           new PermissionStatus(userName, null, FsPermission.getDefault()));
     } finally {
       writeUnlock();
@@ -2270,12 +2273,13 @@ public class FSDirectory implements Clos
   /**
    * Add the specified path into the namespace. Invoked from edit log processing.
    */
-  INodeSymlink unprotectedAddSymlink(String path, String target, long mtime, 
-                                  long atime, PermissionStatus perm) 
+  INodeSymlink unprotectedAddSymlink(long id, String path, String target,
+      long mtime, long atime, PermissionStatus perm)
       throws UnresolvedLinkException, QuotaExceededException {
     assert hasWriteLock();
-    final INodeSymlink symlink = new INodeSymlink(target, mtime, atime, perm);
-    return addINode(path, symlink)? symlink: null;
+    final INodeSymlink symlink = new INodeSymlink(id, target, mtime, atime,
+        perm);
+    return addINode(path, symlink) ? symlink : null;
   }
   
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Mon Jan 14 03:44:35 2013
@@ -121,11 +121,8 @@ public class FSEditLogLoader {
     long lastTxId = in.getLastTxId();
     long numTxns = (lastTxId - expectedStartingTxId) + 1;
     long lastLogTime = now();
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("edit log length: " + in.length() + ", start txid: "
-          + expectedStartingTxId + ", last txid: " + lastTxId);
-    }
+    long lastInodeId = fsNamesys.getLastInodeId();
+    
     try {
       while (true) {
         try {
@@ -171,7 +168,10 @@ public class FSEditLogLoader {
             }
           }
           try {
-            applyEditLogOp(op, fsDir, in.getVersion());
+            long inodeId = applyEditLogOp(op, fsDir, in.getVersion());
+            if (lastInodeId < inodeId) {
+              lastInodeId = inodeId;
+            }
           } catch (Throwable e) {
             LOG.error("Encountered exception on operation " + op, e);
             MetaRecoveryContext.editLogLoaderPrompt("Failed to " +
@@ -206,6 +206,7 @@ public class FSEditLogLoader {
         }
       }
     } finally {
+      fsNamesys.resetLastInodeId(lastInodeId);
       if(closeOnExit) {
         in.close();
       }
@@ -224,9 +225,9 @@ public class FSEditLogLoader {
   }
   
   @SuppressWarnings("deprecation")
-  private void applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
+  private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
       int logVersion) throws IOException {
-
+    long inodeId = INodeId.GRANDFATHER_INODE_ID;
     if (LOG.isTraceEnabled()) {
       LOG.trace("replaying edit log: " + op);
     }
@@ -257,11 +258,11 @@ public class FSEditLogLoader {
         assert addCloseOp.blocks.length == 0;
 
         // add to the file tree
-        newFile = (INodeFile)fsDir.unprotectedAddFile(
-            addCloseOp.path, addCloseOp.permissions,
-            replication, addCloseOp.mtime,
-            addCloseOp.atime, addCloseOp.blockSize,
-            true, addCloseOp.clientName, addCloseOp.clientMachine);
+        inodeId = fsNamesys.allocateNewInodeId();
+        newFile = (INodeFile) fsDir.unprotectedAddFile(inodeId,
+            addCloseOp.path, addCloseOp.permissions, replication,
+            addCloseOp.mtime, addCloseOp.atime, addCloseOp.blockSize, true,
+            addCloseOp.clientName, addCloseOp.clientMachine);
         fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path);
 
       } else { // This is OP_ADD on an existing file
@@ -374,7 +375,8 @@ public class FSEditLogLoader {
     }
     case OP_MKDIR: {
       MkdirOp mkdirOp = (MkdirOp)op;
-      fsDir.unprotectedMkdir(mkdirOp.path, mkdirOp.permissions,
+      inodeId = fsNamesys.allocateNewInodeId();
+      fsDir.unprotectedMkdir(inodeId, mkdirOp.path, mkdirOp.permissions,
                              mkdirOp.timestamp);
       break;
     }
@@ -427,9 +429,10 @@ public class FSEditLogLoader {
     }
     case OP_SYMLINK: {
       SymlinkOp symlinkOp = (SymlinkOp)op;
-      fsDir.unprotectedAddSymlink(symlinkOp.path, symlinkOp.value,
-                               symlinkOp.mtime, symlinkOp.atime,
-                               symlinkOp.permissionStatus);
+      inodeId = fsNamesys.allocateNewInodeId();
+      fsDir.unprotectedAddSymlink(inodeId, symlinkOp.path,
+                                  symlinkOp.value, symlinkOp.mtime, 
+                                  symlinkOp.atime, symlinkOp.permissionStatus);
       break;
     }
     case OP_RENAME: {
@@ -489,6 +492,7 @@ public class FSEditLogLoader {
     default:
       throw new IOException("Invalid operation read " + op.opCode);
     }
+    return inodeId;
   }
   
   private static String formatEditLogReplayError(EditLogInputStream in,

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Mon Jan 14 03:44:35 2013
@@ -59,6 +59,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.util.Canceler;
 import org.apache.hadoop.hdfs.util.MD5FileUtils;
 import org.apache.hadoop.io.MD5Hash;
+import org.apache.hadoop.util.IdGenerator;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -92,6 +93,7 @@ public class FSImage implements Closeabl
   final private Configuration conf;
 
   protected NNStorageRetentionManager archivalManager;
+  protected IdGenerator blockIdGenerator;
 
   /**
    * Construct an FSImage
@@ -137,6 +139,9 @@ public class FSImage implements Closeabl
     Preconditions.checkState(fileCount == 1,
         "FSImage.format should be called with an uninitialized namesystem, has " +
         fileCount + " files");
+    // BlockIdGenerator is defined during formatting
+    // currently there is only one BlockIdGenerator
+    blockIdGenerator = createBlockIdGenerator(fsn);
     NamespaceInfo ns = NNStorage.newNamespaceInfo();
     ns.clusterID = clusterId;
     
@@ -253,6 +258,7 @@ public class FSImage implements Closeabl
       doRollback();
       break;
     case REGULAR:
+    default:
       // just load the image
     }
     
@@ -737,6 +743,9 @@ public class FSImage implements Closeabl
     FSImageFormat.Loader loader = new FSImageFormat.Loader(
         conf, target);
     loader.load(curFile);
+    // BlockIdGenerator is determined after loading image
+    // currently there is only one BlockIdGenerator
+    blockIdGenerator = createBlockIdGenerator(target);
     target.setBlockPoolId(this.getBlockPoolID());
 
     // Check that the image digest we loaded matches up with what
@@ -1165,4 +1174,12 @@ public class FSImage implements Closeabl
   public synchronized long getMostRecentCheckpointTxId() {
     return storage.getMostRecentCheckpointTxId();
   }
+
+  public long getUniqueBlockId() {
+    return blockIdGenerator.nextValue();
+  }
+
+  public IdGenerator createBlockIdGenerator(FSNamesystem fsn) {
+    return new RandomBlockIdGenerator(fsn);
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Mon Jan 14 03:44:35 2013
@@ -220,7 +220,8 @@ class FSImageFormat {
         in = compression.unwrapInputStream(fin);
 
         LOG.info("Loading image file " + curFile + " using " + compression);
-
+        // reset INodeId. TODO: remove this after inodeId is persisted in fsimage
+        namesystem.resetLastInodeIdWithoutChecking(INodeId.LAST_RESERVED_ID); 
         // load all inodes
         LOG.info("Number of files = " + numFiles);
         if (LayoutVersion.supports(Feature.FSIMAGE_NAME_OPTIMIZATION,
@@ -400,6 +401,8 @@ class FSImageFormat {
     long blockSize = 0;
     
     int imgVersion = getLayoutVersion();
+    long inodeId = namesystem.allocateNewInodeId();
+    
     short replication = in.readShort();
     replication = namesystem.getBlockManager().adjustReplication(replication);
     modificationTime = in.readLong();
@@ -437,7 +440,7 @@ class FSImageFormat {
     
     PermissionStatus permissions = PermissionStatus.read(in);
 
-    return INode.newINode(permissions, blocks, symlink, replication,
+    return INode.newINode(inodeId, permissions, blocks, symlink, replication,
         modificationTime, atime, nsQuota, dsQuota, blockSize);
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java Mon Jan 14 03:44:35 2013
@@ -107,7 +107,9 @@ public class FSImageSerialization {
     int numLocs = in.readInt();
     assert numLocs == 0 : "Unexpected block locations";
 
-    return new INodeFileUnderConstruction(name, 
+    //TODO: get inodeId from fsimage after inodeId is persisted
+    return new INodeFileUnderConstruction(INodeId.GRANDFATHER_INODE_ID,
+                                          name,
                                           blockReplication, 
                                           modificationTime,
                                           preferredBlockSize,

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Jan 14 03:44:35 2013
@@ -78,8 +78,9 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.FileWriter;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.lang.management.ManagementFactory;
@@ -211,6 +212,7 @@ import org.apache.hadoop.util.VersionInf
 import org.mortbay.util.ajax.JSON;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
@@ -383,14 +385,43 @@ public class FSNamesystem implements Nam
 
   private final boolean haEnabled;
     
+  private INodeId inodeId;
+  
+  /**
+   * Set the last allocated inode id when fsimage or editlog is loaded. 
+   */
+  public void resetLastInodeId(long newValue) throws IOException {
+    try {
+      inodeId.skipTo(newValue);
+    } catch(IllegalStateException ise) {
+      throw new IOException(ise);
+    }
+  }
+
+  /** Should only be used for tests to reset to any value */
+  void resetLastInodeIdWithoutChecking(long newValue) {
+    inodeId.setCurrentValue(newValue);
+  }
+  
+  /** @return the last inode ID. */
+  public long getLastInodeId() {
+    return inodeId.getCurrentValue();
+  }
+
+  /** Allocate a new inode ID. */
+  public long allocateNewInodeId() {
+    return inodeId.nextValue();
+  }
+  
   /**
    * Clear all loaded data
    */
   void clear() {
     dir.reset();
     dtSecretManager.reset();
-    generationStamp.setStamp(GenerationStamp.FIRST_VALID_STAMP);
+    generationStamp.setCurrentValue(GenerationStamp.LAST_RESERVED_STAMP);
     leaseManager.removeAllLeases();
+    inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID);
   }
 
   @VisibleForTesting
@@ -542,6 +573,8 @@ public class FSNamesystem implements Nam
       this.standbyShouldCheckpoint = conf.getBoolean(
           DFS_HA_STANDBY_CHECKPOINTS_KEY, DFS_HA_STANDBY_CHECKPOINTS_DEFAULT);
       
+      this.inodeId = new INodeId();
+      
       // For testing purposes, allow the DT secret manager to be started regardless
       // of whether security is enabled.
       alwaysUseDelegationTokensForTests = conf.getBoolean(
@@ -1067,8 +1100,8 @@ public class FSNamesystem implements Nam
     try {
       checkSuperuserPrivilege();
       File file = new File(System.getProperty("hadoop.log.dir"), filename);
-      PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(file,
-          true)));
+      PrintWriter out = new PrintWriter(new BufferedWriter(
+          new OutputStreamWriter(new FileOutputStream(file, true), Charsets.UTF_8)));
       metaSave(out);
       out.flush();
       out.close();
@@ -2520,13 +2553,9 @@ public class FSNamesystem implements Nam
   private Block allocateBlock(String src, INodesInPath inodesInPath,
       DatanodeDescriptor targets[]) throws IOException {
     assert hasWriteLock();
-    Block b = new Block(DFSUtil.getRandom().nextLong(), 0, 0); 
-    while(isValidBlock(b)) {
-      b.setBlockId(DFSUtil.getRandom().nextLong());
-    }
+    Block b = new Block(getFSImage().getUniqueBlockId(), 0, 0); 
     // Increment the generation stamp for every new block.
-    nextGenerationStamp();
-    b.setGenerationStamp(getGenerationStamp());
+    b.setGenerationStamp(nextGenerationStamp());
     b = dir.addBlock(src, inodesInPath, b, targets);
     NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
         + blockPoolId + " " + b);
@@ -4321,6 +4350,8 @@ public class FSNamesystem implements Nam
       case SAFEMODE_ENTER: // enter safe mode
         enterSafeMode(false);
         break;
+      default:
+        LOG.error("Unexpected safe mode action");
       }
     }
     return isInSafeMode();
@@ -4580,13 +4611,6 @@ public class FSNamesystem implements Nam
     }
   }
 
-  /**
-   * Returns whether the given block is one pointed-to by a file.
-   */
-  private boolean isValidBlock(Block b) {
-    return (blockManager.getBlockCollection(b) != null);
-  }
-
   PermissionStatus createFsOwnerPermissions(FsPermission permission) {
     return new PermissionStatus(fsOwner.getShortUserName(), supergroup, permission);
   }
@@ -4796,14 +4820,14 @@ public class FSNamesystem implements Nam
    * Sets the generation stamp for this filesystem
    */
   void setGenerationStamp(long stamp) {
-    generationStamp.setStamp(stamp);
+    generationStamp.setCurrentValue(stamp);
   }
 
   /**
    * Gets the generation stamp for this filesystem
    */
   long getGenerationStamp() {
-    return generationStamp.getStamp();
+    return generationStamp.getCurrentValue();
   }
 
   /**
@@ -4815,7 +4839,7 @@ public class FSNamesystem implements Nam
       throw new SafeModeException(
           "Cannot get next generation stamp", safeMode);
     }
-    long gs = generationStamp.nextStamp();
+    final long gs = generationStamp.nextValue();
     getEditLog().logGenerationStamp(gs);
     // NB: callers sync the log
     return gs;

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Mon Jan 14 03:44:35 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.util.Arrays;
@@ -143,6 +144,11 @@ public abstract class INode implements C
   }
 
   /**
+   * The inode id
+   */
+  final private long id;
+
+  /**
    *  The inode name is in java UTF8 encoding; 
    *  The name in HdfsFileStatus should keep the same encoding as this.
    *  if this encoding is changed, implicitly getFileInfo and listStatus in
@@ -161,8 +167,9 @@ public abstract class INode implements C
   private long modificationTime = 0L;
   private long accessTime = 0L;
 
-  private INode(byte[] name, long permission, INodeDirectory parent,
+  private INode(long id, byte[] name, long permission, INodeDirectory parent,
       long modificationTime, long accessTime) {
+    this.id = id;
     this.name = name;
     this.permission = permission;
     this.parent = parent;
@@ -170,26 +177,31 @@ public abstract class INode implements C
     this.accessTime = accessTime;
   }
 
-  INode(byte[] name, PermissionStatus permissions, INodeDirectory parent,
-      long modificationTime, long accessTime) {
-    this(name, PermissionStatusFormat.toLong(permissions), parent,
+  INode(long id, byte[] name, PermissionStatus permissions,
+      INodeDirectory parent, long modificationTime, long accessTime) {
+    this(id, name, PermissionStatusFormat.toLong(permissions), parent,
         modificationTime, accessTime);
   }
-
-  INode(PermissionStatus permissions, long mtime, long atime) {
-    this(null, permissions, null, mtime, atime);
+  
+  INode(long id, PermissionStatus permissions, long mtime, long atime) {
+    this(id, null, PermissionStatusFormat.toLong(permissions), null, mtime, atime);
   }
-
-  protected INode(String name, PermissionStatus permissions) {
-    this(DFSUtil.string2Bytes(name), permissions, null, 0L, 0L);
+  
+  protected INode(long id, String name, PermissionStatus permissions) {
+    this(id, DFSUtil.string2Bytes(name), permissions, null, 0L, 0L);
   }
   
   /** @param other Other node to be copied */
   INode(INode other) {
-    this(other.name, other.permission, other.parent, 
+    this(other.id, other.name, other.permission, other.parent, 
         other.modificationTime, other.accessTime);
   }
 
+  /** Get inode id */
+  public long getId() {
+    return this.id;
+  }
+  
   /**
    * Create a copy of this inode for snapshot.
    * 
@@ -598,6 +610,7 @@ public abstract class INode implements C
   /**
    * Create an INode; the inode's name is not set yet
    * 
+   * @param id preassigned inode id
    * @param permissions permissions
    * @param blocks blocks if a file
    * @param symlink symblic link if a symbolic link
@@ -609,7 +622,8 @@ public abstract class INode implements C
    * @param preferredBlockSize block size
    * @return an inode
    */
-  static INode newINode(PermissionStatus permissions,
+  static INode newINode(long id,
+                        PermissionStatus permissions,
                         BlockInfo[] blocks,
                         String symlink,
                         short replication,
@@ -619,17 +633,17 @@ public abstract class INode implements C
                         long dsQuota,
                         long preferredBlockSize) {
     if (symlink.length() != 0) { // check if symbolic link
-      return new INodeSymlink(symlink, modificationTime, atime, permissions);
+      return new INodeSymlink(id, symlink, modificationTime, atime, permissions);
     }  else if (blocks == null) { //not sym link and blocks null? directory!
       if (nsQuota >= 0 || dsQuota >= 0) {
         return new INodeDirectoryWithQuota(
-            permissions, modificationTime, nsQuota, dsQuota);
+             id, permissions, modificationTime, nsQuota, dsQuota);
       } 
       // regular directory
-      return new INodeDirectory(permissions, modificationTime);
+      return new INodeDirectory(id, permissions, modificationTime);
     }
     // file
-    return new INodeFile(permissions, blocks, replication,
+    return new INodeFile(id, permissions, blocks, replication,
         modificationTime, atime, preferredBlockSize);
   }
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Mon Jan 14 03:44:35 2013
@@ -62,17 +62,17 @@ public class INodeDirectory extends INod
 
   private List<INode> children = null;
 
-  public INodeDirectory(String name, PermissionStatus permissions) {
-    super(name, permissions);
+  public INodeDirectory(long id, String name, PermissionStatus permissions) {
+    super(id, name, permissions);
   }
 
-  public INodeDirectory(PermissionStatus permissions, long mTime) {
-    super(permissions, mTime, 0);
+  public INodeDirectory(long id, PermissionStatus permissions, long mTime) {
+    super(id, permissions, mTime, 0);
   }
-
+  
   /** constructor */
-  INodeDirectory(byte[] name, PermissionStatus permissions, long mtime) {
-    super(name, permissions, null, mtime, 0L);
+  INodeDirectory(long id, byte[] name, PermissionStatus permissions, long mtime) {
+    super(id, name, permissions, null, mtime, 0L);
   }
   
   /**

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java Mon Jan 14 03:44:35 2013
@@ -55,16 +55,16 @@ public class INodeDirectoryWithQuota ext
   }
   
   /** constructor with no quota verification */
-  INodeDirectoryWithQuota(PermissionStatus permissions, long modificationTime,
-      long nsQuota, long dsQuota) {
-    super(permissions, modificationTime);
+  INodeDirectoryWithQuota(long id, PermissionStatus permissions,
+      long modificationTime, long nsQuota, long dsQuota) {
+    super(id, permissions, modificationTime);
     this.nsQuota = nsQuota;
     this.dsQuota = dsQuota;
   }
   
   /** constructor with no quota verification */
-  INodeDirectoryWithQuota(String name, PermissionStatus permissions) {
-    super(name, permissions);
+  INodeDirectoryWithQuota(long id, String name, PermissionStatus permissions) {
+    super(id, name, permissions);
   }
   
   /** Get this directory's namespace quota

Modified: hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1432796&r1=1432795&r2=1432796&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Mon Jan 14 03:44:35 2013
@@ -87,21 +87,21 @@ public class INodeFile extends INode imp
 
   private BlockInfo[] blocks;
 
-  INodeFile(PermissionStatus permissions, BlockInfo[] blklist,
+  INodeFile(long id, PermissionStatus permissions, BlockInfo[] blklist,
                       short replication, long modificationTime,
                       long atime, long preferredBlockSize) {
-    this(null, permissions, modificationTime, atime, blklist, replication,
+    this(id, null, permissions, modificationTime, atime, blklist, replication,
         preferredBlockSize);
   }
 
-  protected INodeFile(byte[] name, PermissionStatus permissions, long mtime, long atime,
+  protected INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime, long atime,
       BlockInfo[] blklist, short replication, long preferredBlockSize) {
-    super(name, permissions, null, mtime, atime);
+    super(id, name, permissions, null, mtime, atime);
     header = HeaderFormat.combineReplication(header, replication);
     header = HeaderFormat.combinePreferredBlockSize(header, preferredBlockSize);
     this.blocks = blklist;
   }
-
+  
   protected INodeFile(INodeFile that) {
     super(that);
     this.header = that.header;



Mime
View raw message