hadoop-hdfs-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Todd Lipcon (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (HDFS-5053) NameNode should invoke DataNode APIs to coordinate caching
Date Wed, 11 Sep 2013 00:25:52 GMT

    [ https://issues.apache.org/jira/browse/HDFS-5053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13763762#comment-13763762
] 

Todd Lipcon commented on HDFS-5053:
-----------------------------------

I did my review by loading the patch into my IDE and just adding comments as java comments.
Here's the "diff" of my review:

{code}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java
index 7c0123d..2d31573 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java
@@ -45,6 +45,10 @@
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+// TODO: document -- include the thread safety considerations. It seems like this
+// class isn't itself thread-safe, so needs to be protected by another lock (eg the
+// BlockManager lock?)
+// TODO: annotate
 public class CacheReplicationManager {
 
   private static final Log LOG =
@@ -87,7 +91,7 @@ public long getPendingUncacheBlocksCount() {
   /**
    * Blocks to be uncached
    */
-  private final InvalidateCacheBlocks uncacheBlocks;
+  private final InvalidateCacheBlocks uncacheBlocks; // rename this to "blocksToUncache"
   /**
    * Blocks that need to be cached
    */
@@ -206,6 +210,11 @@ public void processCacheReport(final DatanodeID nodeID, final String
poolId,
             "processCacheReport from dead or unregistered node: " + nodeID);
       }
 
+      // TODO: Can we simplify this somewhat by relaxing how quickly we get the cache
+      // manager up to its full state? eg maybe we never handle cache reports
+      // until we're out of safemode? Maybe that's problematic, but would be good
+      // to kill some complexity in this class, since it duplicates lots of BM.
+
       // To minimize startup time, we discard any second (or later) block reports
       // that we receive while still in startup phase.
       if (namesystem.isInStartupSafeMode() && !node.isFirstBlockReport()) {
@@ -242,10 +251,21 @@ public void processCacheReport(final DatanodeID nodeID, final String
poolId,
    * @param node Datanode sending a cache report
    * @param report cache report
    * @throws IOException
+   *
+   * TODO: can we share code somehow a little better here? Even if we have
+   * to extract some interface which both BlockManager and CacheReplicationManager
+   * implement, eg something with a newBlockReported(), existingBlockReportedGone(),
+   * etc type methods used as callbacks from the algorithm? Seems a shame to duplicate
+   * all this stuff.
    */
   private void processCacheReport(final DatanodeDescriptor node,
       final BlockListAsLongs report) throws IOException {
     // TODO: queued processing on the standby
+    // rather than being correct, I think it's better to be simple + sloppy here, since
+    // the standby-side queueing stuff is pretty messy. How bad is the cache churn likely
+    // to be if we do it "wrong" here? Or would we get a bunch of warnings/errors trying
+    // to look up blocks which don't exist? -todd
+
     // TODO: proper handling for corrupt replicas
     Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
     Collection<Block> toRemove = new LinkedList<Block>();
@@ -347,6 +367,12 @@ private BlockInfo processReportedCachedBlock(final DatanodeDescriptor
dn,
     if (storedBlock == null) {
       // If blocksMap does not contain reported block id,
       // the replica should be removed from the data-node.
+      //
+      // TODO: is this actually something that could happen? Do we actually
+      // need to explicitly un-cache it, or can we just assume that it has also
+      // reported this block to the BlockManager, which will tell the DN to delete
+      // it, and thus get implicitly uncached? At the very least we should
+      // make sure that the tests get code coverage here.
       toInvalidate.add(new Block(block));
       return null;
     }
@@ -363,6 +389,8 @@ private BlockInfo processReportedCachedBlock(final DatanodeDescriptor
dn,
         // TODO: queuing cache operations on the standby
         return null;
       } else {
+        // TODO: Probably worth adding trace logging for these cases -- both  
+        // here and in the other return paths below.
         toInvalidate.add(block);
         return null;
       }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
index 325a5ff..2195f43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
@@ -108,6 +108,8 @@ public void run() {
 
   /**
    * Assigns under replicated blocks to new datanodes based on priority.
+   * TODO: would rather use distinct terminology like "under-cached" and "over-cached" here
+   * and elsewhere in this patch, just so readers are very clear.
    */
   private void computeCachingWork() {
     List<List<Block>> blocksToCache = null;
@@ -132,6 +134,9 @@ private void computeCachingWorkForBlocks(List<List<Block>>
blocksToCache) {
     try {
       synchronized (neededCacheBlocks) {
         for (int priority = 0; priority < blocksToCache.size(); priority++) {
+          // TODO: the priority thing is useless here, right? since we always iterate
+          // through all of them? Given that, is there any benefit to using UnderReplicatedBlocks
+          // instead of something simpler like a single LightWeightLinkedSet?
           for (Block block : blocksToCache.get(priority)) {
             // Required number of cached replicas
             requiredRepl = cacheReplManager.getCacheReplication(block);
@@ -139,6 +144,8 @@ private void computeCachingWorkForBlocks(List<List<Block>>
blocksToCache) {
             cachedNodes = cacheReplManager.getSafeReplicas(
                 cacheReplManager.cachedBlocksMap, block);
             // Replicas that are safely stored on disk
+            // TODO: you sure the below is safe? you can reach into blockManager
+            // like this without any locking?
             storedNodes = cacheReplManager.getSafeReplicas(
                 blockManager.blocksMap, block);
             // "effective" replication factor which includes pending
@@ -149,7 +156,8 @@ private void computeCachingWorkForBlocks(List<List<Block>>
blocksToCache) {
               neededCacheBlocks.remove(block, priority);
               blockLog.info("BLOCK* Removing " + block
                   + " from neededCacheBlocks as it has enough replicas");
-                continue;
+              // TODO: better log message clarity -- better to say "enough cached replicas"
or something
+              continue;
             }
             // Choose some replicas to cache if needed
             additionalRepl = requiredRepl - effectiveRepl;
@@ -229,6 +237,7 @@ private void computeCachingWorkForBlocks(List<List<Block>>
blocksToCache) {
   /**
    * Reassign replication work that has timed out
    */
+  // TODO: rename to processPendingCaches
   private void processPendingReplications() {
     Block[] timedOutItems = pendingCacheBlocks.getTimedOutBlocks();
     if (timedOutItems != null) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java
index 3573b31..d6a3841 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java
@@ -28,17 +28,23 @@
 import org.apache.commons.math.random.RandomDataImpl;
 import org.apache.hadoop.hdfs.protocol.Block;
 
+// TODO: Annotate
 public class CacheReplicationPolicy {
 
   /**
    * Prunes datanodes with insufficient capacity to cache the block.
    * 
    * @return Pruned list of datanodes
+   // TODO: above isn't that clear -- maybe rename this to "selectSufficientCapacity"
+   // and "return the list of datanodes with sufficient capacity"? I kind of thought
+   // it was going to return the list of datanodes which _didn't_ fit (i.e return the
+   // pruned nodes, rather than the remaining nodes post-pruning)
    */
   private static List<DatanodeDescriptor> pruneInsuffientCapacity(Block block,
       List<DatanodeDescriptor> targets) {
     List<DatanodeDescriptor> pruned =
         new ArrayList<DatanodeDescriptor>(targets.size());
+    // TODO: just use a foreach loop below?
     for (Iterator<DatanodeDescriptor> it = targets.iterator(); it.hasNext();) {
       DatanodeDescriptor dn = it.next();
       long remaining = dn.getCacheRemaining();
@@ -67,7 +73,7 @@ private static DatanodeDescriptor randomDatanodeByRemainingCache(Block block,
     TreeMap<Long, DatanodeDescriptor> lottery =
         new TreeMap<Long, DatanodeDescriptor>();
     long totalCacheAvailable = 0;
-    for (Iterator<DatanodeDescriptor> it = targets.iterator(); it.hasNext();) {
+    for (Iterator<DatanodeDescriptor> it = targets.iterator(); it.hasNext();) { //
TODO: use foreach?
       DatanodeDescriptor dn = it.next();
       long remaining = dn.getCacheRemaining();
       totalCacheAvailable += remaining;
@@ -91,7 +97,7 @@ private static DatanodeDescriptor randomDatanodeByRemainingCache(Block block,
     List<DatanodeDescriptor> pruned = pruneInsuffientCapacity(block, targets);
     List<DatanodeDescriptor> chosen =
         new ArrayList<DatanodeDescriptor>(numTargets);
-    for (int i = 0; i < numTargets && pruned.size() > 0; i++) {
+    for (int i = 0; i < numTargets && !pruned.isEmpty(); i++) {
       chosen.add(randomDatanodeByRemainingCache(block, pruned));
     }
     return chosen;
@@ -113,7 +119,7 @@ private static DatanodeDescriptor randomDatanodeByRemainingCache(Block
block,
     Collections.shuffle(nodes);
     final int additionalTargetsNeeded = effectiveReplication - replication;
     int chosen = 0;
-    while (chosen < additionalTargetsNeeded && nodes.size() > 0) {
+    while (chosen < additionalTargetsNeeded && !nodes.isEmpty()) {
       targets.add(nodes.get(chosen));
       chosen++;
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 26a6ce9..c7a9f2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -156,6 +156,7 @@ synchronized void clear() {
   /** A queue of blocks to be cached by this datanode */
   private BlockQueue<Block> cacheBlocks = new BlockQueue<Block>();
   /** A set of blocks to be uncached by this datanode */
+  // TODO: rename to blocksToUncache
   private LightWeightHashSet<Block> uncacheBlocks =
       new LightWeightHashSet<Block>();
 
@@ -290,6 +291,9 @@ int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
    * @return true if block was successfully added, false if already present
    */
   public boolean addCachedBlock(BlockInfo b) {
+    // TODO: any way to add an assertion that this BlockInfo is in fact
+    // a cloned copy? ie that we don't accidentally end up with one BlockInfo
+    // that is on both the cached and uncached lists?
     if (!b.addNode(this))
       return false;
     // add to the head of the data-node list
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateCacheBlocks.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateCacheBlocks.java
index e7cbee9..1ae963f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateCacheBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateCacheBlocks.java
@@ -34,6 +34,13 @@
   synchronized List<Block> invalidateWork(
       final String storageId, final DatanodeDescriptor dn) {
     final List<Block> toInvalidate = invalidateWorkInternal(storageId);
+// This is a little strange, since invalidateWorkInternal will use
+// datanodeManager.blockInvalidateLimit, which isn't really related to caching.
+// Would it be possible to split the data structure portion out of
+// InvalidateBlocks, and give it a method like pollN(storageID, limit)? Then
+// InvalidateCacheBlocks and InvalidateBlocks wouldn't have to have the inheritance
+// relationship (and would also help with things like the now-inappropriate dump() method
+// in the superclass which claims these blocks are ready to be deleted).
     if (toInvalidate != null) {
       dn.addBlocksToBeUncached(toInvalidate);
     }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
index 6b07b78..01c31f2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
@@ -35,6 +35,8 @@
 /***************************************************
  * PendingReplicationBlocks does the bookkeeping of all
  * blocks that are getting replicated.
+ * TODO: add a note here that this is used both for replication of
+ * on-disk replicas as well as for pending requests to cache blocks)
  *
  * It does the following:
  * 1)  record blocks that are getting replicated at this instant.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
index 555cec4..b210a6e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
@@ -61,6 +61,7 @@
  *   The policy here is to keep those corrupt blocks replicated, but give
  *   blocks that are not corrupt higher priority.</li>
  * </ol>
+ * TODO: add note that this is used for "under-cached" blocks as well.
  */
 class UnderReplicatedBlocks implements Iterable<Block> {
   /** The total number of queues : {@value} */
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 15a7d2c..68d28d0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -105,6 +105,14 @@ static long combinePreferredBlockSize(long header, long blockSize) {
   private BlockInfo[] blocks;
 
   private short cacheReplication = 0;
+  // TODO: Are we cool sparing the extra 2 bytes on every INode? I haven't see this
+  // explicitly discussed anywhere, but admittedly I haven't followed as closely
+  // as I should have. Given that we expect cache capacity to be ~2 orders of magnitude
+  // smaller than disk capacity, it seems like it's a bit wasteful to reserve these bits
+  // in every file when 99% of files will set it to 0. I recall seeing another JIRA
+  // that discussed adding attributes to INodes without the cost of adding
+  // a field to every INode object. Another alternative is to steal some bits from the
+  // existing 64-bit header -- 48 bits is a lot for block size, as is 16 bit for replication.
 
   INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime, long atime,
       BlockInfo[] blklist, short replication, long preferredBlockSize) {
{code}
                
> NameNode should invoke DataNode APIs to coordinate caching
> ----------------------------------------------------------
>
>                 Key: HDFS-5053
>                 URL: https://issues.apache.org/jira/browse/HDFS-5053
>             Project: Hadoop HDFS
>          Issue Type: Sub-task
>          Components: datanode, namenode
>            Reporter: Colin Patrick McCabe
>            Assignee: Andrew Wang
>         Attachments: hdfs-5053-1.patch
>
>
> The NameNode should invoke the DataNode APIs to coordinate caching.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message