hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1555021 [5/15] - in /hadoop/common/branches/HDFS-5535/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ hadoop-hdfs/dev-support/ hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apache...
Date Fri, 03 Jan 2014 07:27:01 GMT
Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyWithNodeGroup.java Fri Jan  3 07:26:52 2014
@@ -25,6 +25,8 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.net.NetworkTopology;
@@ -64,81 +66,87 @@ public class BlockPlacementPolicyWithNod
    * @return the chosen node
    */
   @Override
-  protected DatanodeDescriptor chooseLocalNode(Node localMachine,
+  protected DatanodeStorageInfo chooseLocalStorage(Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
-      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
-        throws NotEnoughReplicasException {
+      List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+      StorageType storageType) throws NotEnoughReplicasException {
     // if no local machine, randomly choose one node
     if (localMachine == null)
       return chooseRandom(NodeBase.ROOT, excludedNodes, 
-          blocksize, maxNodesPerRack, results, avoidStaleNodes);
+          blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
 
+    // otherwise try local machine first
     if (localMachine instanceof DatanodeDescriptor) {
       DatanodeDescriptor localDataNode = (DatanodeDescriptor)localMachine;
-      // otherwise try local machine first
       if (excludedNodes.add(localMachine)) { // was not in the excluded list
-        if (addIfIsGoodTarget(localDataNode, excludedNodes, blocksize,
-            maxNodesPerRack, false, results, avoidStaleNodes) >= 0) {
-          return localDataNode;
+        for(DatanodeStorageInfo localStorage : DFSUtil.shuffle(
+            localDataNode.getStorageInfos())) {
+          if (addIfIsGoodTarget(localStorage, excludedNodes, blocksize,
+              maxNodesPerRack, false, results, avoidStaleNodes, storageType) >= 0) {
+            return localStorage;
+          }
         }
       }
     }
 
     // try a node on local node group
-    DatanodeDescriptor chosenNode = chooseLocalNodeGroup(
+    DatanodeStorageInfo chosenStorage = chooseLocalNodeGroup(
         (NetworkTopologyWithNodeGroup)clusterMap, localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results, avoidStaleNodes);
-    if (chosenNode != null) {
-      return chosenNode;
+        blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+    if (chosenStorage != null) {
+      return chosenStorage;
     }
     // try a node on local rack
     return chooseLocalRack(localMachine, excludedNodes, 
-        blocksize, maxNodesPerRack, results, avoidStaleNodes);
+        blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
+  }
+
+  /** @return the node of the second replica */
+  private static DatanodeDescriptor secondNode(Node localMachine,
+      List<DatanodeStorageInfo> results) {
+    // find the second replica
+    for(DatanodeStorageInfo nextStorage : results) {
+      DatanodeDescriptor nextNode = nextStorage.getDatanodeDescriptor();
+      if (nextNode != localMachine) {
+        return nextNode;
+      }
+    }
+    return null;
   }
 
-  
   @Override
-  protected DatanodeDescriptor chooseLocalRack(Node localMachine,
+  protected DatanodeStorageInfo chooseLocalRack(Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
-      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
-      throws NotEnoughReplicasException {
+      List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+      StorageType storageType) throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-                          blocksize, maxNodesPerRack, results, 
-                          avoidStaleNodes);
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes, storageType);
     }
 
     // choose one from the local rack, but off-nodegroup
     try {
-      return chooseRandom(NetworkTopology.getFirstHalf(
-                              localMachine.getNetworkLocation()),
-                          excludedNodes, blocksize, 
-                          maxNodesPerRack, results, 
-                          avoidStaleNodes);
+      final String scope = NetworkTopology.getFirstHalf(localMachine.getNetworkLocation());
+      return chooseRandom(scope, excludedNodes, blocksize, maxNodesPerRack,
+          results, avoidStaleNodes, storageType);
     } catch (NotEnoughReplicasException e1) {
       // find the second replica
-      DatanodeDescriptor newLocal=null;
-      for(DatanodeDescriptor nextNode : results) {
-        if (nextNode != localMachine) {
-          newLocal = nextNode;
-          break;
-        }
-      }
+      final DatanodeDescriptor newLocal = secondNode(localMachine, results);
       if (newLocal != null) {
         try {
           return chooseRandom(
               clusterMap.getRack(newLocal.getNetworkLocation()), excludedNodes,
-              blocksize, maxNodesPerRack, results, avoidStaleNodes);
+              blocksize, maxNodesPerRack, results, avoidStaleNodes, storageType);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
           return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-              maxNodesPerRack, results, avoidStaleNodes);
+              maxNodesPerRack, results, avoidStaleNodes, storageType);
         }
       } else {
         //otherwise randomly choose one from the network
         return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes);
+            maxNodesPerRack, results, avoidStaleNodes, storageType);
       }
     }
   }
@@ -146,8 +154,9 @@ public class BlockPlacementPolicyWithNod
   @Override
   protected void chooseRemoteRack(int numOfReplicas,
       DatanodeDescriptor localMachine, Set<Node> excludedNodes,
-      long blocksize, int maxReplicasPerRack, List<DatanodeDescriptor> results,
-      boolean avoidStaleNodes) throws NotEnoughReplicasException {
+      long blocksize, int maxReplicasPerRack, List<DatanodeStorageInfo> results,
+      boolean avoidStaleNodes, StorageType storageType)
+          throws NotEnoughReplicasException {
     int oldNumOfReplicas = results.size();
 
     final String rackLocation = NetworkTopology.getFirstHalf(
@@ -155,12 +164,12 @@ public class BlockPlacementPolicyWithNod
     try {
       // randomly choose from remote racks
       chooseRandom(numOfReplicas, "~" + rackLocation, excludedNodes, blocksize,
-          maxReplicasPerRack, results, avoidStaleNodes);
+          maxReplicasPerRack, results, avoidStaleNodes, storageType);
     } catch (NotEnoughReplicasException e) {
       // fall back to the local rack
       chooseRandom(numOfReplicas - (results.size() - oldNumOfReplicas),
           rackLocation, excludedNodes, blocksize,
-          maxReplicasPerRack, results, avoidStaleNodes);
+          maxReplicasPerRack, results, avoidStaleNodes, storageType);
     }
   }
 
@@ -170,46 +179,40 @@ public class BlockPlacementPolicyWithNod
    * if still no such node is available, choose a random node in the cluster.
    * @return the chosen node
    */
-  private DatanodeDescriptor chooseLocalNodeGroup(
+  private DatanodeStorageInfo chooseLocalNodeGroup(
       NetworkTopologyWithNodeGroup clusterMap, Node localMachine,
       Set<Node> excludedNodes, long blocksize, int maxNodesPerRack,
-      List<DatanodeDescriptor> results, boolean avoidStaleNodes)
-      throws NotEnoughReplicasException {
+      List<DatanodeStorageInfo> results, boolean avoidStaleNodes,
+      StorageType storageType) throws NotEnoughReplicasException {
     // no local machine, so choose a random machine
     if (localMachine == null) {
-      return chooseRandom(NodeBase.ROOT, excludedNodes, 
-      blocksize, maxNodesPerRack, results, avoidStaleNodes);
+      return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
+          maxNodesPerRack, results, avoidStaleNodes, storageType);
     }
 
     // choose one from the local node group
     try {
       return chooseRandom(
           clusterMap.getNodeGroup(localMachine.getNetworkLocation()),
-          excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes);
+          excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
+          storageType);
     } catch (NotEnoughReplicasException e1) {
-      // find the second replica
-      DatanodeDescriptor newLocal=null;
-      for(DatanodeDescriptor nextNode : results) {
-        if (nextNode != localMachine) {
-          newLocal = nextNode;
-          break;
-        }
-      }
+      final DatanodeDescriptor newLocal = secondNode(localMachine, results);
       if (newLocal != null) {
         try {
           return chooseRandom(
               clusterMap.getNodeGroup(newLocal.getNetworkLocation()),
               excludedNodes, blocksize, maxNodesPerRack, results,
-              avoidStaleNodes);
+              avoidStaleNodes, storageType);
         } catch(NotEnoughReplicasException e2) {
           //otherwise randomly choose one from the network
           return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-              maxNodesPerRack, results, avoidStaleNodes);
+              maxNodesPerRack, results, avoidStaleNodes, storageType);
         }
       } else {
         //otherwise randomly choose one from the network
         return chooseRandom(NodeBase.ROOT, excludedNodes, blocksize,
-            maxNodesPerRack, results, avoidStaleNodes);
+            maxNodesPerRack, results, avoidStaleNodes, storageType);
       }
     }
   }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java Fri Jan  3 07:26:52 2014
@@ -30,11 +30,11 @@ import org.apache.hadoop.util.LightWeigh
  * the datanodes that store the block.
  */
 class BlocksMap {
-  private static class NodeIterator implements Iterator<DatanodeDescriptor> {
+  private static class StorageIterator implements Iterator<DatanodeStorageInfo> {
     private BlockInfo blockInfo;
     private int nextIdx = 0;
       
-    NodeIterator(BlockInfo blkInfo) {
+    StorageIterator(BlockInfo blkInfo) {
       this.blockInfo = blkInfo;
     }
 
@@ -45,8 +45,8 @@ class BlocksMap {
     }
 
     @Override
-    public DatanodeDescriptor next() {
-      return blockInfo.getDatanode(nextIdx++);
+    public DatanodeStorageInfo next() {
+      return blockInfo.getStorageInfo(nextIdx++);
     }
 
     @Override
@@ -129,18 +129,23 @@ class BlocksMap {
 
   /**
    * Searches for the block in the BlocksMap and 
-   * returns Iterator that iterates through the nodes the block belongs to.
+   * returns {@link Iterable} of the storages the block belongs to.
    */
-  Iterator<DatanodeDescriptor> nodeIterator(Block b) {
-    return nodeIterator(blocks.get(b));
+  Iterable<DatanodeStorageInfo> getStorages(Block b) {
+    return getStorages(blocks.get(b));
   }
 
   /**
    * For a block that has already been retrieved from the BlocksMap
-   * returns Iterator that iterates through the nodes the block belongs to.
+   * returns {@link Iterable} of the storages the block belongs to.
    */
-  Iterator<DatanodeDescriptor> nodeIterator(BlockInfo storedBlock) {
-    return new NodeIterator(storedBlock);
+  Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
+    return new Iterable<DatanodeStorageInfo>() {
+      @Override
+      public Iterator<DatanodeStorageInfo> iterator() {
+        return new StorageIterator(storedBlock);
+      }
+    };
   }
 
   /** counts number of containing nodes. Better than using iterator. */

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Fri Jan  3 07:26:52 2014
@@ -27,6 +27,9 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,6 +51,8 @@ import org.apache.hadoop.hdfs.util.ReadO
 import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Scans the namesystem, scheduling blocks to be cached as appropriate.
  *
@@ -79,25 +84,47 @@ public class CacheReplicationMonitor ext
   private final long intervalMs;
 
   /**
-   * True if we should rescan immediately, regardless of how much time
-   * elapsed since the previous scan.
+   * The CacheReplicationMonitor (CRM) lock. Used to synchronize starting and
+   * waiting for rescan operations.
    */
-  private boolean rescanImmediately;
+  private final ReentrantLock lock;
 
   /**
-   * The monotonic time at which the current scan started.
+   * Notifies the scan thread that an immediate rescan is needed.
    */
-  private long scanTimeMs;
+  private final Condition doRescan;
 
   /**
-   * Mark status of the current scan.
+   * Notifies waiting threads that a rescan has finished.
    */
-  private boolean mark = false;
+  private final Condition scanFinished;
+
+  /**
+   * Whether there are pending CacheManager operations that necessitate a
+   * CacheReplicationMonitor rescan. Protected by the CRM lock.
+   */
+  private boolean needsRescan = true;
+
+  /**
+   * Whether we are currently doing a rescan. Protected by the CRM lock.
+   */
+  private boolean isScanning = false;
+
+  /**
+   * The number of rescans completed. Used to wait for scans to finish.
+   * Protected by the CacheReplicationMonitor lock.
+   */
+  private long scanCount = 0;
+
+  /**
+   * True if this monitor should terminate. Protected by the CRM lock.
+   */
+  private boolean shutdown = false;
 
   /**
-   * True if this monitor should terminate.
+   * Mark status of the current scan.
    */
-  private boolean shutdown;
+  private boolean mark = false;
 
   /**
    * Cache directives found in the previous scan.
@@ -108,55 +135,74 @@ public class CacheReplicationMonitor ext
    * Blocks found in the previous scan.
    */
   private long scannedBlocks;
-  
+
   public CacheReplicationMonitor(FSNamesystem namesystem,
-      CacheManager cacheManager, long intervalMs) {
+      CacheManager cacheManager, long intervalMs, ReentrantLock lock) {
     this.namesystem = namesystem;
     this.blockManager = namesystem.getBlockManager();
     this.cacheManager = cacheManager;
     this.cachedBlocks = cacheManager.getCachedBlocks();
     this.intervalMs = intervalMs;
+    this.lock = lock;
+    this.doRescan = this.lock.newCondition();
+    this.scanFinished = this.lock.newCondition();
   }
 
   @Override
   public void run() {
-    shutdown = false;
-    rescanImmediately = true;
-    scanTimeMs = 0;
+    long startTimeMs = 0;
+    Thread.currentThread().setName("CacheReplicationMonitor(" +
+        System.identityHashCode(this) + ")");
     LOG.info("Starting CacheReplicationMonitor with interval " +
              intervalMs + " milliseconds");
     try {
       long curTimeMs = Time.monotonicNow();
       while (true) {
-        synchronized(this) {
+        lock.lock();
+        try {
           while (true) {
             if (shutdown) {
               LOG.info("Shutting down CacheReplicationMonitor");
               return;
             }
-            if (rescanImmediately) {
-              LOG.info("Rescanning on request");
-              rescanImmediately = false;
+            if (needsRescan) {
+              LOG.info("Rescanning because of pending operations");
               break;
             }
-            long delta = (scanTimeMs + intervalMs) - curTimeMs;
+            long delta = (startTimeMs + intervalMs) - curTimeMs;
             if (delta <= 0) {
-              LOG.info("Rescanning after " + (curTimeMs - scanTimeMs) +
+              LOG.info("Rescanning after " + (curTimeMs - startTimeMs) +
                   " milliseconds");
               break;
             }
-            this.wait(delta);
+            doRescan.await(delta, TimeUnit.MILLISECONDS);
             curTimeMs = Time.monotonicNow();
           }
+          isScanning = true;
+          needsRescan = false;
+        } finally {
+          lock.unlock();
         }
-        scanTimeMs = curTimeMs;
+        startTimeMs = curTimeMs;
         mark = !mark;
         rescan();
         curTimeMs = Time.monotonicNow();
+        // Update synchronization-related variables.
+        lock.lock();
+        try {
+          isScanning = false;
+          scanCount++;
+          scanFinished.signalAll();
+        } finally {
+          lock.unlock();
+        }
         LOG.info("Scanned " + scannedDirectives + " directive(s) and " +
-            scannedBlocks + " block(s) in " + (curTimeMs - scanTimeMs) + " " +
+            scannedBlocks + " block(s) in " + (curTimeMs - startTimeMs) + " " +
             "millisecond(s).");
       }
+    } catch (InterruptedException e) {
+      LOG.info("Shutting down CacheReplicationMonitor.");
+      return;
     } catch (Throwable t) {
       LOG.fatal("Thread exiting", t);
       terminate(1, t);
@@ -164,41 +210,80 @@ public class CacheReplicationMonitor ext
   }
 
   /**
-   * Kick the monitor thread.
-   * 
-   * If it is sleeping, it will wake up and start scanning.
-   * If it is currently scanning, it will finish the scan and immediately do 
-   * another one.
+   * Waits for a rescan to complete. This doesn't guarantee consistency with
+   * pending operations, only relative recency, since it will not force a new
+   * rescan if a rescan is already underway.
+   * <p>
+   * Note that this call will release the FSN lock, so operations before and
+   * after are not atomic.
+   */
+  public void waitForRescanIfNeeded() {
+    Preconditions.checkArgument(!namesystem.hasWriteLock(),
+        "Must not hold the FSN write lock when waiting for a rescan.");
+    Preconditions.checkArgument(lock.isHeldByCurrentThread(),
+        "Must hold the CRM lock when waiting for a rescan.");
+    if (!needsRescan) {
+      return;
+    }
+    // If no scan is already ongoing, mark the CRM as dirty and kick
+    if (!isScanning) {
+      doRescan.signal();
+    }
+    // Wait until the scan finishes and the count advances
+    final long startCount = scanCount;
+    while ((!shutdown) && (startCount >= scanCount)) {
+      try {
+        scanFinished.await();
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for CacheReplicationMonitor"
+            + " rescan", e);
+        break;
+      }
+    }
+  }
+
+  /**
+   * Indicates to the CacheReplicationMonitor that there have been CacheManager
+   * changes that require a rescan.
    */
-  public synchronized void kick() {
-    rescanImmediately = true;
-    this.notifyAll();
+  public void setNeedsRescan() {
+    Preconditions.checkArgument(lock.isHeldByCurrentThread(),
+        "Must hold the CRM lock when setting the needsRescan bit.");
+    this.needsRescan = true;
   }
 
   /**
-   * Shut down and join the monitor thread.
+   * Shut down the monitor thread.
    */
   @Override
   public void close() throws IOException {
-    synchronized(this) {
+    Preconditions.checkArgument(namesystem.hasWriteLock());
+    lock.lock();
+    try {
       if (shutdown) return;
+      // Since we hold both the FSN write lock and the CRM lock here,
+      // we know that the CRM thread cannot be currently modifying
+      // the cache manager state while we're closing it.
+      // Since the CRM thread checks the value of 'shutdown' after waiting
+      // for a lock, we know that the thread will not modify the cache
+      // manager state after this point.
       shutdown = true;
-      this.notifyAll();
-    }
-    try {
-      if (this.isAlive()) {
-        this.join(60000);
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
+      doRescan.signalAll();
+      scanFinished.signalAll();
+    } finally {
+      lock.unlock();
     }
   }
 
-  private void rescan() {
+  private void rescan() throws InterruptedException {
     scannedDirectives = 0;
     scannedBlocks = 0;
     namesystem.writeLock();
     try {
+      if (shutdown) {
+        throw new InterruptedException("CacheReplicationMonitor was " +
+            "shut down.");
+      }
       resetStatistics();
       rescanCacheDirectives();
       rescanCachedBlockMap();
@@ -228,12 +313,14 @@ public class CacheReplicationMonitor ext
       // Reset the directive's statistics
       directive.resetStatistics();
       // Skip processing this entry if it has expired
-      LOG.info("Directive expiry is at " + directive.getExpiryTime());
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Directive expiry is at " + directive.getExpiryTime());
+      }
       if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Skipping directive id " + directive.getId()
-              + " because it has expired (" + directive.getExpiryTime() + ">="
-              + now);
+              + " because it has expired (" + directive.getExpiryTime() + "<="
+              + now + ")");
         }
         continue;
       }
@@ -280,15 +367,27 @@ public class CacheReplicationMonitor ext
 
     // Increment the "needed" statistics
     directive.addFilesNeeded(1);
-    long neededTotal = 0;
-    for (BlockInfo blockInfo : blockInfos) {
-      long neededByBlock = 
-          directive.getReplication() * blockInfo.getNumBytes();
-       neededTotal += neededByBlock;
-    }
+    // We don't cache UC blocks, don't add them to the total here
+    long neededTotal = file.computeFileSizeNotIncludingLastUcBlock() *
+        directive.getReplication();
     directive.addBytesNeeded(neededTotal);
 
-    // TODO: Enforce per-pool quotas
+    // The pool's bytesNeeded is incremented as we scan. If the demand
+    // thus far plus the demand of this file would exceed the pool's limit,
+    // do not cache this file.
+    CachePool pool = directive.getPool();
+    if (pool.getBytesNeeded() > pool.getLimit()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Skipping directive id %d file %s because "
+            + "limit of pool %s would be exceeded (%d > %d)",
+            directive.getId(),
+            file.getFullPathName(),
+            pool.getPoolName(),
+            pool.getBytesNeeded(),
+            pool.getLimit()));
+      }
+      return;
+    }
 
     long cachedTotal = 0;
     for (BlockInfo blockInfo : blockInfos) {
@@ -315,14 +414,21 @@ public class CacheReplicationMonitor ext
             directive.getReplication()) * blockInfo.getNumBytes();
         cachedTotal += cachedByBlock;
 
-        if (mark != ocblock.getMark()) {
-          // Mark hasn't been set in this scan, so update replication and mark.
+        if ((mark != ocblock.getMark()) ||
+            (ocblock.getReplication() < directive.getReplication())) {
+          //
+          // Overwrite the block's replication and mark in two cases:
+          //
+          // 1. If the mark on the CachedBlock is different from the mark for
+          // this scan, that means the block hasn't been updated during this
+          // scan, and we should overwrite whatever is there, since it is no
+          // longer valid.
+          //
+          // 2. If the replication in the CachedBlock is less than what the
+          // directive asks for, we want to increase the block's replication
+          // field to what the directive asks for.
+          //
           ocblock.setReplicationAndMark(directive.getReplication(), mark);
-        } else {
-          // Mark already set in this scan.  Set replication to highest value in
-          // any CacheDirective that covers this file.
-          ocblock.setReplicationAndMark((short)Math.max(
-              directive.getReplication(), ocblock.getReplication()), mark);
         }
       }
     }
@@ -338,6 +444,36 @@ public class CacheReplicationMonitor ext
     }
   }
 
+  private String findReasonForNotCaching(CachedBlock cblock, 
+          BlockInfo blockInfo) {
+    if (blockInfo == null) {
+      // Somehow, a cache report with the block arrived, but the block
+      // reports from the DataNode haven't (yet?) described such a block.
+      // Alternately, the NameNode might have invalidated the block, but the
+      // DataNode hasn't caught up.  In any case, we want to tell the DN
+      // to uncache this.
+      return "not tracked by the BlockManager";
+    } else if (!blockInfo.isComplete()) {
+      // When a cached block changes state from complete to some other state
+      // on the DataNode (perhaps because of append), it will begin the
+      // uncaching process.  However, the uncaching process is not
+      // instantaneous, especially if clients have pinned the block.  So
+      // there may be a period of time when incomplete blocks remain cached
+      // on the DataNodes.
+      return "not complete";
+    }  else if (cblock.getReplication() == 0) {
+      // Since 0 is not a valid value for a cache directive's replication
+      // field, seeing a replication of 0 on a CacheBlock means that it
+      // has never been reached by any sweep.
+      return "not needed by any directives";
+    } else if (cblock.getMark() != mark) { 
+      // Although the block was needed in the past, we didn't reach it during
+      // the current sweep.  Therefore, it doesn't need to be cached any more.
+      return "no longer needed by any directives";
+    }
+    return null;
+  }
+
   /**
    * Scan through the cached block map.
    * Any blocks which are under-replicated should be assigned new Datanodes.
@@ -363,11 +499,17 @@ public class CacheReplicationMonitor ext
           iter.remove();
         }
       }
-      // If the block's mark doesn't match with the mark of this scan, that
-      // means that this block couldn't be reached during this scan.  That means
-      // it doesn't need to be cached any more.
-      int neededCached = (cblock.getMark() != mark) ?
-          0 : cblock.getReplication();
+      BlockInfo blockInfo = blockManager.
+            getStoredBlock(new Block(cblock.getBlockId()));
+      String reason = findReasonForNotCaching(cblock, blockInfo);
+      int neededCached = 0;
+      if (reason != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("not caching " + cblock + " because it is " + reason);
+        }
+      } else {
+        neededCached = cblock.getReplication();
+      }
       int numCached = cached.size();
       if (numCached >= neededCached) {
         // If we have enough replicas, drop all pending cached.
@@ -421,9 +563,6 @@ public class CacheReplicationMonitor ext
   private void addNewPendingUncached(int neededUncached,
       CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
       List<DatanodeDescriptor> pendingUncached) {
-    if (!cacheManager.isActive()) {
-      return;
-    }
     // Figure out which replicas can be uncached.
     LinkedList<DatanodeDescriptor> possibilities =
         new LinkedList<DatanodeDescriptor>();
@@ -459,16 +598,15 @@ public class CacheReplicationMonitor ext
   private void addNewPendingCached(int neededCached,
       CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
       List<DatanodeDescriptor> pendingCached) {
-    if (!cacheManager.isActive()) {
-      return;
-    }
     // To figure out which replicas can be cached, we consult the
     // blocksMap.  We don't want to try to cache a corrupt replica, though.
     BlockInfo blockInfo = blockManager.
           getStoredBlock(new Block(cachedBlock.getBlockId()));
     if (blockInfo == null) {
-      LOG.debug("Not caching block " + cachedBlock + " because it " +
-          "was deleted from all DataNodes.");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not caching block " + cachedBlock + " because there " +
+            "is no record of it on the NameNode.");
+      }
       return;
     }
     if (!blockInfo.isComplete()) {

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Fri Jan  3 07:26:52 2014
@@ -18,23 +18,29 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.util.IntrusiveCollection;
 import org.apache.hadoop.util.Time;
 
-import com.google.common.annotations.VisibleForTesting;
-
 /**
  * This class extends the DatanodeInfo class with ephemeral information (eg
  * health, capacity, what blocks are associated with the Datanode) that is
@@ -43,6 +49,7 @@ import com.google.common.annotations.Vis
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class DatanodeDescriptor extends DatanodeInfo {
+  public static final Log LOG = LogFactory.getLog(DatanodeDescriptor.class);
   public static final DatanodeDescriptor[] EMPTY_ARRAY = {};
 
   // Stores status of decommissioning.
@@ -54,9 +61,9 @@ public class DatanodeDescriptor extends 
   @InterfaceStability.Evolving
   public static class BlockTargetPair {
     public final Block block;
-    public final DatanodeDescriptor[] targets;    
+    public final DatanodeStorageInfo[] targets;    
 
-    BlockTargetPair(Block block, DatanodeDescriptor[] targets) {
+    BlockTargetPair(Block block, DatanodeStorageInfo[] targets) {
       this.block = block;
       this.targets = targets;
     }
@@ -99,6 +106,9 @@ public class DatanodeDescriptor extends 
     }
   }
 
+  private final Map<String, DatanodeStorageInfo> storageMap = 
+      new HashMap<String, DatanodeStorageInfo>();
+
   /**
    * A list of CachedBlock objects on this datanode.
    */
@@ -164,37 +174,11 @@ public class DatanodeDescriptor extends 
    */
   private long lastCachingDirectiveSentTimeMs;
 
-  /**
-   * Head of the list of blocks on the datanode
-   */
-  private volatile BlockInfo blockList = null;
-  /**
-   * Number of blocks on the datanode
-   */
-  private int numBlocks = 0;
-
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   public boolean isAlive = false;
   public boolean needKeyUpdate = false;
 
-  /**
-   * Set to false on any NN failover, and reset to true
-   * whenever a block report is received.
-   */
-  private boolean heartbeatedSinceFailover = false;
-  
-  /**
-   * At startup or at any failover, the DNs in the cluster may
-   * have pending block deletions from a previous incarnation
-   * of the NameNode. Thus, we consider their block contents
-   * stale until we have received a block report. When a DN
-   * is considered stale, any replicas on it are transitively
-   * considered stale. If any block has at least one stale replica,
-   * then no invalidations will be processed for this block.
-   * See HDFS-1972.
-   */
-  private boolean blockContentsStale = true;
   
   // A system administrator can tune the balancer bandwidth parameter
   // (dfs.balance.bandwidthPerSec) dynamically by calling
@@ -213,7 +197,7 @@ public class DatanodeDescriptor extends 
   private LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
 
   /* Variables for maintaining number of blocks scheduled to be written to
-   * this datanode. This count is approximate and might be slightly bigger
+   * this storage. This count is approximate and might be slightly bigger
    * in case of errors (e.g. datanode does not report if an error occurs
    * while writing the block).
    */
@@ -223,9 +207,6 @@ public class DatanodeDescriptor extends 
   private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
   private int volumeFailures = 0;
   
-  /** Set to false after processing first block report */
-  private boolean firstBlockReport = true;
-  
   /** 
    * When set to true, the node is not in include list and is not allowed
    * to communicate with the namenode
@@ -237,7 +218,8 @@ public class DatanodeDescriptor extends 
    * @param nodeID id of the data node
    */
   public DatanodeDescriptor(DatanodeID nodeID) {
-    this(nodeID, 0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
+    super(nodeID);
+    updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
   }
 
   /**
@@ -247,104 +229,60 @@ public class DatanodeDescriptor extends 
    */
   public DatanodeDescriptor(DatanodeID nodeID, 
                             String networkLocation) {
-    this(nodeID, networkLocation, 0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
-  }
-  
-  /**
-   * DatanodeDescriptor constructor
-   * @param nodeID id of the data node
-   * @param capacity capacity of the data node
-   * @param dfsUsed space used by the data node
-   * @param remaining remaining capacity of the data node
-   * @param bpused space used by the block pool corresponding to this namenode
-   * @param cacheCapacity cache capacity of the data node
-   * @param cacheUsed cache used on the data node
-   * @param xceiverCount # of data transfers at the data node
-   */
-  public DatanodeDescriptor(DatanodeID nodeID, 
-                            long capacity,
-                            long dfsUsed,
-                            long remaining,
-                            long bpused,
-                            long cacheCapacity,
-                            long cacheUsed,
-                            int xceiverCount,
-                            int failedVolumes) {
-    super(nodeID);
-    updateHeartbeat(capacity, dfsUsed, remaining, bpused, cacheCapacity,
-        cacheUsed, xceiverCount, failedVolumes);
-  }
-
-  /**
-   * DatanodeDescriptor constructor
-   * @param nodeID id of the data node
-   * @param networkLocation location of the data node in network
-   * @param capacity capacity of the data node, including space used by non-dfs
-   * @param dfsUsed the used space by dfs datanode
-   * @param remaining remaining capacity of the data node
-   * @param bpused space used by the block pool corresponding to this namenode
-   * @param cacheCapacity cache capacity of the data node
-   * @param cacheUsed cache used on the data node
-   * @param xceiverCount # of data transfers at the data node
-   */
-  public DatanodeDescriptor(DatanodeID nodeID,
-                            String networkLocation,
-                            long capacity,
-                            long dfsUsed,
-                            long remaining,
-                            long bpused,
-                            long cacheCapacity,
-                            long cacheUsed,
-                            int xceiverCount,
-                            int failedVolumes) {
     super(nodeID, networkLocation);
-    updateHeartbeat(capacity, dfsUsed, remaining, bpused, cacheCapacity,
-        cacheUsed, xceiverCount, failedVolumes);
+    updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
   }
 
   /**
-   * Add datanode to the block.
-   * Add block to the head of the list of blocks belonging to the data-node.
+   * Add data-node to the block. Add block to the head of the list of blocks
+   * belonging to the data-node.
    */
-  public boolean addBlock(BlockInfo b) {
-    if(!b.addNode(this))
-      return false;
-    // add to the head of the data-node list
-    blockList = b.listInsert(blockList, this);
-    numBlocks++;
-    return true;
+  public boolean addBlock(String storageID, BlockInfo b) {
+    DatanodeStorageInfo s = getStorageInfo(storageID);
+    if (s != null) {
+      return s.addBlock(b);
+    }
+    return false;
   }
-  
-  /**
-   * Remove block from the list of blocks belonging to the data-node.
-   * Remove datanode from the block.
-   */
-  public boolean removeBlock(BlockInfo b) {
-    blockList = b.listRemove(blockList, this);
-    if ( b.removeNode(this) ) {
-      numBlocks--;
-      return true;
-    } else {
-      return false;
+
+  DatanodeStorageInfo getStorageInfo(String storageID) {
+    synchronized (storageMap) {
+      return storageMap.get(storageID);
+    }
+  }
+  DatanodeStorageInfo[] getStorageInfos() {
+    synchronized (storageMap) {
+      final Collection<DatanodeStorageInfo> storages = storageMap.values();
+      return storages.toArray(new DatanodeStorageInfo[storages.size()]);
     }
   }
 
   /**
-   * Move block to the head of the list of blocks belonging to the data-node.
-   * @return the index of the head of the blockList
+   * Remove block from the list of blocks belonging to the data-node. Remove
+   * data-node from the block.
    */
-  int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
-    blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex);
-    return curIndex;
+  boolean removeBlock(BlockInfo b) {
+    int index = b.findStorageInfo(this);
+    // if block exists on this datanode
+    if (index >= 0) {
+      DatanodeStorageInfo s = b.getStorageInfo(index);
+      if (s != null) {
+        return s.removeBlock(b);
+      }
+    }
+    return false;
   }
-
+  
   /**
-   * Used for testing only
-   * @return the head of the blockList
+   * Remove block from the list of blocks belonging to the data-node. Remove
+   * data-node from the block.
    */
-  @VisibleForTesting
-  protected BlockInfo getHead(){
-    return blockList;
+  boolean removeBlock(String storageID, BlockInfo b) {
+    DatanodeStorageInfo s = getStorageInfo(storageID);
+    if (s != null) {
+      return s.removeBlock(b);
+    }
+    return false;
   }
 
   /**
@@ -355,9 +293,12 @@ public class DatanodeDescriptor extends 
    * @return the new block
    */
   public BlockInfo replaceBlock(BlockInfo oldBlock, BlockInfo newBlock) {
-    boolean done = removeBlock(oldBlock);
+    int index = oldBlock.findStorageInfo(this);
+    DatanodeStorageInfo s = oldBlock.getStorageInfo(index);
+    boolean done = s.removeBlock(oldBlock);
     assert done : "Old block should belong to the data-node when replacing";
-    done = addBlock(newBlock);
+
+    done = s.addBlock(newBlock);
     assert done : "New block should not belong to the data-node when replacing";
     return newBlock;
   }
@@ -368,7 +309,6 @@ public class DatanodeDescriptor extends 
     setBlockPoolUsed(0);
     setDfsUsed(0);
     setXceiverCount(0);
-    this.blockList = null;
     this.invalidateBlocks.clear();
     this.volumeFailures = 0;
     // pendingCached, cached, and pendingUncached are protected by the
@@ -392,66 +332,97 @@ public class DatanodeDescriptor extends 
   }
 
   public int numBlocks() {
-    return numBlocks;
+    int blocks = 0;
+    for (DatanodeStorageInfo entry : getStorageInfos()) {
+      blocks += entry.numBlocks();
+    }
+    return blocks;
   }
 
   /**
    * Updates stats from datanode heartbeat.
    */
-  public void updateHeartbeat(long capacity, long dfsUsed, long remaining,
-      long blockPoolUsed, long cacheCapacity, long cacheUsed, int xceiverCount,
-      int volFailures) {
-    setCapacity(capacity);
-    setRemaining(remaining);
-    setBlockPoolUsed(blockPoolUsed);
-    setDfsUsed(dfsUsed);
+  public void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
+      long cacheUsed, int xceiverCount, int volFailures) {
+    long totalCapacity = 0;
+    long totalRemaining = 0;
+    long totalBlockPoolUsed = 0;
+    long totalDfsUsed = 0;
+
     setCacheCapacity(cacheCapacity);
     setCacheUsed(cacheUsed);
     setXceiverCount(xceiverCount);
     setLastUpdate(Time.now());    
     this.volumeFailures = volFailures;
-    this.heartbeatedSinceFailover = true;
+    for (StorageReport report : reports) {
+      DatanodeStorageInfo storage = storageMap.get(report.getStorageID());
+      if (storage == null) {
+        // This is seen during cluster initialization when the heartbeat
+        // is received before the initial block reports from each storage.
+        storage = updateStorage(new DatanodeStorage(report.getStorageID()));
+      }
+      storage.receivedHeartbeat(report);
+      totalCapacity += report.getCapacity();
+      totalRemaining += report.getRemaining();
+      totalBlockPoolUsed += report.getBlockPoolUsed();
+      totalDfsUsed += report.getDfsUsed();
+    }
     rollBlocksScheduled(getLastUpdate());
+
+    // Update total metrics for the node.
+    setCapacity(totalCapacity);
+    setRemaining(totalRemaining);
+    setBlockPoolUsed(totalBlockPoolUsed);
+    setDfsUsed(totalDfsUsed);
   }
 
-  /**
-   * Iterates over the list of blocks belonging to the datanode.
-   */
-  public static class BlockIterator implements Iterator<BlockInfo> {
-    private BlockInfo current;
-    private DatanodeDescriptor node;
-      
-    BlockIterator(BlockInfo head, DatanodeDescriptor dn) {
-      this.current = head;
-      this.node = dn;
+  private static class BlockIterator implements Iterator<BlockInfo> {
+    private int index = 0;
+    private final List<Iterator<BlockInfo>> iterators;
+    
+    private BlockIterator(final DatanodeStorageInfo... storages) {
+      List<Iterator<BlockInfo>> iterators = new ArrayList<Iterator<BlockInfo>>();
+      for (DatanodeStorageInfo e : storages) {
+        iterators.add(e.getBlockIterator());
+      }
+      this.iterators = Collections.unmodifiableList(iterators);
     }
 
     @Override
     public boolean hasNext() {
-      return current != null;
+      update();
+      return !iterators.isEmpty() && iterators.get(index).hasNext();
     }
 
     @Override
     public BlockInfo next() {
-      BlockInfo res = current;
-      current = current.getNext(current.findDatanode(node));
-      return res;
+      update();
+      return iterators.get(index).next();
     }
-
+    
     @Override
-    public void remove()  {
-      throw new UnsupportedOperationException("Sorry. can't remove.");
+    public void remove() {
+      throw new UnsupportedOperationException("Remove unsupported.");
+    }
+    
+    private void update() {
+      while(index < iterators.size() - 1 && !iterators.get(index).hasNext()) {
+        index++;
+      }
     }
   }
 
-  public Iterator<BlockInfo> getBlockIterator() {
-    return new BlockIterator(this.blockList, this);
+  Iterator<BlockInfo> getBlockIterator() {
+    return new BlockIterator(getStorageInfos());
+  }
+  Iterator<BlockInfo> getBlockIterator(final String storageID) {
+    return new BlockIterator(getStorageInfo(storageID));
   }
 
   /**
    * Store block replication work.
    */
-  void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
+  void addBlockToBeReplicated(Block block, DatanodeStorageInfo[] targets) {
     assert(block != null && targets != null && targets.length > 0);
     replicateBlocks.offer(new BlockTargetPair(block, targets));
   }
@@ -526,18 +497,14 @@ public class DatanodeDescriptor extends 
   public int getBlocksScheduled() {
     return currApproxBlocksScheduled + prevApproxBlocksScheduled;
   }
-  
-  /**
-   * Increments counter for number of blocks scheduled. 
-   */
-  public void incBlocksScheduled() {
+
+  /** Increment the number of blocks scheduled. */
+  void incrementBlocksScheduled() {
     currApproxBlocksScheduled++;
   }
   
-  /**
-   * Decrements counter for number of blocks scheduled.
-   */
-  void decBlocksScheduled() {
+  /** Decrement the number of blocks scheduled. */
+  void decrementBlocksScheduled() {
     if (prevApproxBlocksScheduled > 0) {
       prevApproxBlocksScheduled--;
     } else if (currApproxBlocksScheduled > 0) {
@@ -546,12 +513,9 @@ public class DatanodeDescriptor extends 
     // its ok if both counters are zero.
   }
   
-  /**
-   * Adjusts curr and prev number of blocks scheduled every few minutes.
-   */
+  /** Adjusts curr and prev number of blocks scheduled every few minutes. */
   private void rollBlocksScheduled(long now) {
-    if ((now - lastBlocksScheduledRollTime) > 
-        BLOCKS_SCHEDULED_ROLL_INTERVAL) {
+    if (now - lastBlocksScheduledRollTime > BLOCKS_SCHEDULED_ROLL_INTERVAL) {
       prevApproxBlocksScheduled = currApproxBlocksScheduled;
       currApproxBlocksScheduled = 0;
       lastBlocksScheduledRollTime = now;
@@ -647,7 +611,11 @@ public class DatanodeDescriptor extends 
   @Override
   public void updateRegInfo(DatanodeID nodeReg) {
     super.updateRegInfo(nodeReg);
-    firstBlockReport = true; // must re-process IBR after re-registration
+    
+    // must re-process IBR after re-registration
+    for(DatanodeStorageInfo storage : getStorageInfos()) {
+      storage.setBlockReportCount(0);
+    }
   }
 
   /**
@@ -664,26 +632,6 @@ public class DatanodeDescriptor extends 
     this.bandwidth = bandwidth;
   }
 
-  public boolean areBlockContentsStale() {
-    return blockContentsStale;
-  }
-
-  public void markStaleAfterFailover() {
-    heartbeatedSinceFailover = false;
-    blockContentsStale = true;
-  }
-
-  public void receivedBlockReport() {
-    if (heartbeatedSinceFailover) {
-      blockContentsStale = false;
-    }
-    firstBlockReport = false;
-  }
-  
-  boolean isFirstBlockReport() {
-    return firstBlockReport;
-  }
-
   @Override
   public String dumpDatanode() {
     StringBuilder sb = new StringBuilder(super.dumpDatanode());
@@ -702,6 +650,19 @@ public class DatanodeDescriptor extends 
     return sb.toString();
   }
 
+  DatanodeStorageInfo updateStorage(DatanodeStorage s) {
+    synchronized (storageMap) {
+      DatanodeStorageInfo storage = storageMap.get(s.getStorageID());
+      if (storage == null) {
+        LOG.info("Adding new storage ID " + s.getStorageID() +
+                 " for DN " + getXferAddr());
+        storage = new DatanodeStorageInfo(this, s);
+        storageMap.put(s.getStorageID(), storage);
+      }
+      return storage;
+    }
+  }
+
   /**
    * @return   The time at which we last sent caching directives to this 
    *           DataNode, in monotonic milliseconds.
@@ -718,3 +679,4 @@ public class DatanodeDescriptor extends 
     this.lastCachingDirectiveSentTimeMs = time;
   }
 }
+

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Fri Jan  3 07:26:52 2014
@@ -424,9 +424,13 @@ public class DatanodeManager {
   }
 
 
-  /** Get a datanode descriptor given corresponding storageID */
-  DatanodeDescriptor getDatanode(final String storageID) {
-    return datanodeMap.get(storageID);
+  /** Get a datanode descriptor given corresponding DatanodeUUID */
+  DatanodeDescriptor getDatanode(final String datanodeUuid) {
+    if (datanodeUuid == null) {
+      return null;
+    }
+
+    return datanodeMap.get(datanodeUuid);
   }
 
   /**
@@ -438,7 +442,7 @@ public class DatanodeManager {
    */
   public DatanodeDescriptor getDatanode(DatanodeID nodeID
       ) throws UnregisteredNodeException {
-    final DatanodeDescriptor node = getDatanode(nodeID.getStorageID());
+    final DatanodeDescriptor node = getDatanode(nodeID.getDatanodeUuid());
     if (node == null) 
       return null;
     if (!node.getXferAddr().equals(nodeID.getXferAddr())) {
@@ -451,6 +455,20 @@ public class DatanodeManager {
     return node;
   }
 
+  public DatanodeStorageInfo[] getDatanodeStorageInfos(
+      DatanodeID[] datanodeID, String[] storageIDs)
+          throws UnregisteredNodeException {
+    if (datanodeID.length == 0) {
+      return null;
+    }
+    final DatanodeStorageInfo[] storages = new DatanodeStorageInfo[datanodeID.length];
+    for(int i = 0; i < datanodeID.length; i++) {
+      final DatanodeDescriptor dd = getDatanode(datanodeID[i]);
+      storages[i] = dd.getStorageInfo(storageIDs[i]);
+    }
+    return storages; 
+  }
+
   /** Prints information about all datanodes. */
   void datanodeDump(final PrintWriter out) {
     synchronized (datanodeMap) {
@@ -528,7 +546,7 @@ public class DatanodeManager {
     // remove  from host2DatanodeMap the datanodeDescriptor removed
     // from datanodeMap before adding node to host2DatanodeMap.
     synchronized(datanodeMap) {
-      host2DatanodeMap.remove(datanodeMap.put(node.getStorageID(), node));
+      host2DatanodeMap.remove(datanodeMap.put(node.getDatanodeUuid(), node));
     }
 
     networktopology.add(node); // may throw InvalidTopologyException
@@ -543,7 +561,7 @@ public class DatanodeManager {
 
   /** Physically remove node from datanodeMap. */
   private void wipeDatanode(final DatanodeID node) {
-    final String key = node.getStorageID();
+    final String key = node.getDatanodeUuid();
     synchronized (datanodeMap) {
       host2DatanodeMap.remove(datanodeMap.remove(key));
     }
@@ -705,8 +723,10 @@ public class DatanodeManager {
   /** Start decommissioning the specified datanode. */
   private void startDecommission(DatanodeDescriptor node) {
     if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
-      LOG.info("Start Decommissioning " + node + " with " + 
-          node.numBlocks() +  " blocks");
+      for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+        LOG.info("Start Decommissioning " + node + " " + storage
+            + " with " + storage.numBlocks() + " blocks");
+      }
       heartbeatManager.startDecommission(node);
       node.decommissioningStatus.setStartTime(now());
       
@@ -729,24 +749,6 @@ public class DatanodeManager {
   }
 
   /**
-   * Generate new storage ID.
-   * 
-   * @return unique storage ID
-   * 
-   * Note: that collisions are still possible if somebody will try 
-   * to bring in a data storage from a different cluster.
-   */
-  private String newStorageID() {
-    String newID = null;
-    while(newID == null) {
-      newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
-      if (datanodeMap.get(newID) != null)
-        newID = null;
-    }
-    return newID;
-  }
-
-  /**
    * Register the given datanode with the namenode. NB: the given
    * registration is mutated and given back to the datanode.
    *
@@ -784,9 +786,9 @@ public class DatanodeManager {
       }
         
       NameNode.stateChangeLog.info("BLOCK* registerDatanode: from "
-          + nodeReg + " storage " + nodeReg.getStorageID());
+          + nodeReg + " storage " + nodeReg.getDatanodeUuid());
   
-      DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
+      DatanodeDescriptor nodeS = getDatanode(nodeReg.getDatanodeUuid());
       DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
           nodeReg.getIpAddr(), nodeReg.getXferPort());
         
@@ -821,7 +823,7 @@ public class DatanodeManager {
          */        
           NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS
               + " is replaced by " + nodeReg + " with the same storageID "
-              + nodeReg.getStorageID());
+              + nodeReg.getDatanodeUuid());
         }
         
         boolean success = false;
@@ -853,20 +855,8 @@ public class DatanodeManager {
           }
         }
         return;
-      } 
-  
-      // this is a new datanode serving a new data storage
-      if ("".equals(nodeReg.getStorageID())) {
-        // this data storage has never been registered
-        // it is either empty or was created by pre-storageID version of DFS
-        nodeReg.setStorageID(newStorageID());
-        if (NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug(
-              "BLOCK* NameSystem.registerDatanode: "
-              + "new storageID " + nodeReg.getStorageID() + " assigned.");
-        }
       }
-      
+
       DatanodeDescriptor nodeDescr 
         = new DatanodeDescriptor(nodeReg, NetworkTopology.DEFAULT_RACK);
       boolean success = false;
@@ -1234,10 +1224,10 @@ public class DatanodeManager {
 
   /** Handle heartbeat from datanodes. */
   public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
-      final String blockPoolId,
-      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
-      long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers,
-      int failedVolumes) throws IOException {
+      StorageReport[] reports, final String blockPoolId,
+      long cacheCapacity, long cacheUsed, int xceiverCount, 
+      int maxTransfers, int failedVolumes
+      ) throws IOException {
     synchronized (heartbeatManager) {
       synchronized (datanodeMap) {
         DatanodeDescriptor nodeinfo = null;
@@ -1257,9 +1247,9 @@ public class DatanodeManager {
           return new DatanodeCommand[]{RegisterCommand.REGISTER};
         }
 
-        heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed,
-            remaining, blockPoolUsed, cacheCapacity, cacheUsed, xceiverCount,
-            failedVolumes);
+        heartbeatManager.updateHeartbeat(nodeinfo, reports,
+                                         cacheCapacity, cacheUsed,
+                                         xceiverCount, failedVolumes);
 
         // If we are in safemode, do not send back any recovery / replication
         // requests. Don't even drain the existing queue of work.
@@ -1274,32 +1264,32 @@ public class DatanodeManager {
           BlockRecoveryCommand brCommand = new BlockRecoveryCommand(
               blocks.length);
           for (BlockInfoUnderConstruction b : blocks) {
-            DatanodeDescriptor[] expectedLocations = b.getExpectedLocations();
+            final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations();
             // Skip stale nodes during recovery - not heart beated for some time (30s by default).
-            List<DatanodeDescriptor> recoveryLocations =
-                new ArrayList<DatanodeDescriptor>(expectedLocations.length);
-            for (int i = 0; i < expectedLocations.length; i++) {
-              if (!expectedLocations[i].isStale(this.staleInterval)) {
-                recoveryLocations.add(expectedLocations[i]);
+            final List<DatanodeStorageInfo> recoveryLocations =
+                new ArrayList<DatanodeStorageInfo>(storages.length);
+            for (int i = 0; i < storages.length; i++) {
+              if (!storages[i].getDatanodeDescriptor().isStale(staleInterval)) {
+                recoveryLocations.add(storages[i]);
               }
             }
             // If we only get 1 replica after eliminating stale nodes, then choose all
             // replicas for recovery and let the primary data node handle failures.
             if (recoveryLocations.size() > 1) {
-              if (recoveryLocations.size() != expectedLocations.length) {
+              if (recoveryLocations.size() != storages.length) {
                 LOG.info("Skipped stale nodes for recovery : " +
-                    (expectedLocations.length - recoveryLocations.size()));
+                    (storages.length - recoveryLocations.size()));
               }
               brCommand.add(new RecoveringBlock(
                   new ExtendedBlock(blockPoolId, b),
-                  recoveryLocations.toArray(new DatanodeDescriptor[recoveryLocations.size()]),
+                  DatanodeStorageInfo.toDatanodeInfos(recoveryLocations),
                   b.getBlockRecoveryId()));
             } else {
               // If too many replicas are stale, then choose all replicas to participate
               // in block recovery.
               brCommand.add(new RecoveringBlock(
                   new ExtendedBlock(blockPoolId, b),
-                  expectedLocations,
+                  DatanodeStorageInfo.toDatanodeInfos(storages),
                   b.getBlockRecoveryId()));
             }
           }
@@ -1416,7 +1406,9 @@ public class DatanodeManager {
     LOG.info("Marking all datandoes as stale");
     synchronized (datanodeMap) {
       for (DatanodeDescriptor dn : datanodeMap.values()) {
-        dn.markStaleAfterFailover();
+        for(DatanodeStorageInfo storage : dn.getStorageInfos()) {
+          storage.markStaleAfterFailover();
+        }
       }
     }
   }
@@ -1451,7 +1443,15 @@ public class DatanodeManager {
     return getClass().getSimpleName() + ": " + host2DatanodeMap;
   }
 
+  public void clearPendingCachingCommands() {
+    for (DatanodeDescriptor dn : datanodeMap.values()) {
+      dn.getPendingCached().clear();
+      dn.getPendingUncached().clear();
+    }
+  }
+
   public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
     this.shouldSendCachingCommands = shouldSendCachingCommands;
   }
 }
+

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Fri Jan  3 07:26:52 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
 
@@ -181,7 +182,7 @@ class HeartbeatManager implements Datano
       addDatanode(d);
 
       //update its timestamp
-      d.updateHeartbeat(0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
+      d.updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
     }
   }
 
@@ -203,11 +204,11 @@ class HeartbeatManager implements Datano
   }
 
   synchronized void updateHeartbeat(final DatanodeDescriptor node,
-      long capacity, long dfsUsed, long remaining, long blockPoolUsed,
-      long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes) {
+      StorageReport[] reports, long cacheCapacity, long cacheUsed,
+      int xceiverCount, int failedVolumes) {
     stats.subtract(node);
-    node.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
-        cacheCapacity, cacheUsed, xceiverCount, failedVolumes);
+    node.updateHeartbeat(reports, cacheCapacity, cacheUsed,
+      xceiverCount, failedVolumes);
     stats.add(node);
   }
 
@@ -358,3 +359,4 @@ class HeartbeatManager implements Datano
     }
   }
 }
+

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Fri Jan  3 07:26:52 2014
@@ -78,10 +78,10 @@ class InvalidateBlocks {
    */
   synchronized void add(final Block block, final DatanodeInfo datanode,
       final boolean log) {
-    LightWeightHashSet<Block> set = node2blocks.get(datanode.getStorageID());
+    LightWeightHashSet<Block> set = node2blocks.get(datanode.getDatanodeUuid());
     if (set == null) {
       set = new LightWeightHashSet<Block>();
-      node2blocks.put(datanode.getStorageID(), set);
+      node2blocks.put(datanode.getDatanodeUuid(), set);
     }
     if (set.add(block)) {
       numBlocks++;

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/MutableBlockCollection.java Fri Jan  3 07:26:52 2014
@@ -34,5 +34,5 @@ public interface MutableBlockCollection 
    * and set the locations.
    */
   public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
-      DatanodeDescriptor[] locations) throws IOException;
+      DatanodeStorageInfo[] storages) throws IOException;
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingDataNodeMessages.java Fri Jan  3 07:26:52 2014
@@ -42,11 +42,13 @@ class PendingDataNodeMessages {
   static class ReportedBlockInfo {
     private final Block block;
     private final DatanodeDescriptor dn;
+    private final String storageID;
     private final ReplicaState reportedState;
 
-    ReportedBlockInfo(DatanodeDescriptor dn, Block block,
+    ReportedBlockInfo(DatanodeDescriptor dn, String storageID, Block block,
         ReplicaState reportedState) {
       this.dn = dn;
+      this.storageID = storageID;
       this.block = block;
       this.reportedState = reportedState;
     }
@@ -58,6 +60,10 @@ class PendingDataNodeMessages {
     DatanodeDescriptor getNode() {
       return dn;
     }
+    
+    String getStorageID() {
+      return storageID;
+    }
 
     ReplicaState getReportedState() {
       return reportedState;
@@ -70,11 +76,11 @@ class PendingDataNodeMessages {
     }
   }
   
-  void enqueueReportedBlock(DatanodeDescriptor dn, Block block,
+  void enqueueReportedBlock(DatanodeDescriptor dn, String storageID, Block block,
       ReplicaState reportedState) {
     block = new Block(block);
     getBlockQueue(block).add(
-        new ReportedBlockInfo(dn, block, reportedState));
+        new ReportedBlockInfo(dn, storageID, block, reportedState));
     count++;
   }
   

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Fri Jan  3 07:26:52 2014
@@ -117,6 +117,18 @@ public class JspHelper {
       return 0;
     }
   }
+  
+  /**
+   * convenience method for canonicalizing host name.
+   * @param addr name:port or name 
+   * @return canonicalized host name
+   */
+   public static String canonicalize(String addr) {
+    // default port 1 is supplied to allow addr without port.
+    // the port will be ignored.
+    return NetUtils.createSocketAddr(addr, 1).getAddress()
+           .getCanonicalHostName();
+  }
 
   /**
    * A helper class that generates the correct URL for different schema.
@@ -124,10 +136,11 @@ public class JspHelper {
    */
   public static final class Url {
     public static String authority(String scheme, DatanodeID d) {
+      String fqdn = canonicalize(d.getIpAddr());
       if (scheme.equals("http")) {
-        return d.getInfoAddr();
+        return fqdn + ":" + d.getInfoPort();
       } else if (scheme.equals("https")) {
-        return d.getInfoSecureAddr();
+        return fqdn + ":" + d.getInfoSecurePort();
       } else {
         throw new IllegalArgumentException("Unknown scheme:" + scheme);
       }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java Fri Jan  3 07:26:52 2014
@@ -236,6 +236,8 @@ public abstract class Storage extends St
     final boolean useLock;        // flag to enable storage lock
     final StorageDirType dirType; // storage dir type
     FileLock lock;                // storage lock
+
+    private String storageUuid = null;      // Storage directory identifier.
     
     public StorageDirectory(File dir) {
       // default dirType is null
@@ -246,6 +248,14 @@ public abstract class Storage extends St
       this(dir, dirType, true);
     }
     
+    public void setStorageUuid(String storageUuid) {
+      this.storageUuid = storageUuid;
+    }
+
+    public String getStorageUuid() {
+      return storageUuid;
+    }
+
     /**
      * Constructor
      * @param dir directory corresponding to the storage

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Fri Jan  3 07:26:52 2014
@@ -27,6 +27,7 @@ import java.util.concurrent.CopyOnWriteA
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -147,7 +148,7 @@ class BPOfferService {
     return false;
   }
   
-  String getBlockPoolId() {
+  synchronized String getBlockPoolId() {
     if (bpNSInfo != null) {
       return bpNSInfo.getBlockPoolID();
     } else {
@@ -160,31 +161,32 @@ class BPOfferService {
   synchronized NamespaceInfo getNamespaceInfo() {
     return bpNSInfo;
   }
-  
+
   @Override
-  public String toString() {
+  public synchronized String toString() {
     if (bpNSInfo == null) {
       // If we haven't yet connected to our NN, we don't yet know our
       // own block pool ID.
       // If _none_ of the block pools have connected yet, we don't even
-      // know the storage ID of this DN.
-      String storageId = dn.getStorageId();
-      if (storageId == null || "".equals(storageId)) {
-        storageId = "unknown";
+      // know the DatanodeID ID of this DN.
+      String datanodeUuid = dn.getDatanodeUuid();
+
+      if (datanodeUuid == null || datanodeUuid.isEmpty()) {
+        datanodeUuid = "unassigned";
       }
-      return "Block pool <registering> (storage id " + storageId +
-        ")";
+      return "Block pool <registering> (Datanode Uuid " + datanodeUuid + ")";
     } else {
       return "Block pool " + getBlockPoolId() +
-        " (storage id " + dn.getStorageId() +
-        ")";
+          " (Datanode Uuid " + dn.getDatanodeUuid() +
+          ")";
     }
   }
   
-  void reportBadBlocks(ExtendedBlock block) {
+  void reportBadBlocks(ExtendedBlock block,
+                       String storageUuid, StorageType storageType) {
     checkBlock(block);
     for (BPServiceActor actor : bpServices) {
-      actor.reportBadBlocks(block);
+      actor.reportBadBlocks(block, storageUuid, storageType);
     }
   }
   
@@ -193,7 +195,8 @@ class BPOfferService {
    * till namenode is informed before responding with success to the
    * client? For now we don't.
    */
-  void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
+  void notifyNamenodeReceivedBlock(
+      ExtendedBlock block, String delHint, String storageUuid) {
     checkBlock(block);
     checkDelHint(delHint);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
@@ -202,7 +205,7 @@ class BPOfferService {
         delHint);
 
     for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeBlockImmediately(bInfo);
+      actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
     }
   }
 
@@ -219,23 +222,23 @@ class BPOfferService {
         "delHint is null");
   }
 
-  void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+  void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
     checkBlock(block);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
        block.getLocalBlock(), BlockStatus.DELETED_BLOCK, null);
     
     for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeDeletedBlock(bInfo);
+      actor.notifyNamenodeDeletedBlock(bInfo, storageUuid);
     }
   }
   
-  void notifyNamenodeReceivingBlock(ExtendedBlock block) {
+  void notifyNamenodeReceivingBlock(ExtendedBlock block, String storageUuid) {
     checkBlock(block);
     ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
        block.getLocalBlock(), BlockStatus.RECEIVING_BLOCK, null);
     
     for (BPServiceActor actor : bpServices) {
-      actor.notifyNamenodeBlockImmediately(bInfo);
+      actor.notifyNamenodeBlockImmediately(bInfo, storageUuid);
     }
   }
 
@@ -274,12 +277,22 @@ class BPOfferService {
   synchronized void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
     if (this.bpNSInfo == null) {
       this.bpNSInfo = nsInfo;
-      
+      boolean success = false;
+
       // Now that we know the namespace ID, etc, we can pass this to the DN.
       // The DN can now initialize its local storage if we are the
       // first BP to handshake, etc.
-      dn.initBlockPool(this);
-      return;
+      try {
+        dn.initBlockPool(this);
+        success = true;
+      } finally {
+        if (!success) {
+          // The datanode failed to initialize the BP. We need to reset
+          // the namespace info so that other BPService actors still have
+          // a chance to set it, and re-initialize the datanode.
+          this.bpNSInfo = null;
+        }
+      }
     } else {
       checkNSEquality(bpNSInfo.getBlockPoolID(), nsInfo.getBlockPoolID(),
           "Blockpool ID");
@@ -328,7 +341,7 @@ class BPOfferService {
     }
   }
 
-  synchronized DatanodeRegistration createRegistration() {
+  synchronized DatanodeRegistration createRegistration() throws IOException {
     Preconditions.checkState(bpNSInfo != null,
         "getRegistration() can only be called after initial handshake");
     return dn.createBPRegistration(bpNSInfo);



Mime
View raw message