hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject [21/50] [abbrv] hadoop git commit: HDFS-8781. Erasure Coding: Correctly handle BlockManager#InvalidateBlocks for striped block. Contributed by Yi Liu.
Date Thu, 20 Aug 2015 06:27:45 GMT
HDFS-8781. Erasure Coding: Correctly handle BlockManager#InvalidateBlocks for striped block.
Contributed by Yi Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5956d23b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5956d23b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5956d23b

Branch: refs/heads/HDFS-7285-merge
Commit: 5956d23b645e272748e2edca4c30231e729b96a9
Parents: f8f7a92
Author: Jing Zhao <jing9@apache.org>
Authored: Tue Jul 21 17:00:49 2015 -0700
Committer: Jing Zhao <jing9@apache.org>
Committed: Tue Jul 21 17:00:49 2015 -0700

----------------------------------------------------------------------
 .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt        |  3 +
 .../server/blockmanagement/BlockManager.java    | 33 ++++++--
 .../blockmanagement/DatanodeDescriptor.java     |  7 ++
 .../hdfs/TestReadStripedFileWithDecoding.java   | 83 +++++++++++++++-----
 4 files changed, 98 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5956d23b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
index 10a8cde..ad1e4e7 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt
@@ -367,3 +367,6 @@
 
     HDFS-8760. Erasure Coding: reuse BlockReader when reading the same block in pread.
     (jing9)
+
+    HDFS-8781. Erasure Coding: Correctly handle BlockManager#InvalidateBlocks for
+    striped block. (Yi Liu via jing9)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5956d23b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 7872baa..1594a9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -783,7 +783,10 @@ public class BlockManager {
 
     // remove this block from the list of pending blocks to be deleted. 
     for (DatanodeStorageInfo storage : targets) {
-      invalidateBlocks.remove(storage.getDatanodeDescriptor(), oldBlock);
+      final Block b = getBlockOnStorage(oldBlock, storage);
+      if (b != null) {
+        invalidateBlocks.remove(storage.getDatanodeDescriptor(), b);
+      }
     }
     
     // Adjust safe-mode totals, since under-construction blocks don't
@@ -802,12 +805,14 @@ public class BlockManager {
   /**
    * Get all valid locations of the block
    */
-  private List<DatanodeStorageInfo> getValidLocations(Block block) {
+  private List<DatanodeStorageInfo> getValidLocations(BlockInfo block) {
     final List<DatanodeStorageInfo> locations
         = new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
     for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
       // filter invalidate replicas
-      if(!invalidateBlocks.contains(storage.getDatanodeDescriptor(), block)) {
+      Block b = getBlockOnStorage(block, storage);
+      if(b != null && 
+          !invalidateBlocks.contains(storage.getDatanodeDescriptor(), b)) {
         locations.add(storage);
       }
     }
@@ -1156,7 +1161,10 @@ public class BlockManager {
     while(it.hasNext()) {
       BlockInfo block = it.next();
       removeStoredBlock(block, node);
-      invalidateBlocks.remove(node, block);
+      final Block b = getBlockOnStorage(block, storageInfo);
+      if (b != null) {
+        invalidateBlocks.remove(node, b);
+      }
     }
     namesystem.checkSafeMode();
   }
@@ -1184,7 +1192,7 @@ public class BlockManager {
     for(DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock,
         State.NORMAL)) {
       final DatanodeDescriptor node = storage.getDatanodeDescriptor();
-      final Block b = getBlockToInvalidate(storedBlock, storage);
+      final Block b = getBlockOnStorage(storedBlock, storage);
       if (b != null) {
         invalidateBlocks.add(b, node, false);
         datanodes.append(node).append(" ");
@@ -1196,7 +1204,7 @@ public class BlockManager {
     }
   }
 
-  private Block getBlockToInvalidate(BlockInfo storedBlock,
+  private Block getBlockOnStorage(BlockInfo storedBlock,
       DatanodeStorageInfo storage) {
     return storedBlock.isStriped() ?
         ((BlockInfoStriped) storedBlock).getBlockOnStorage(storage) : storedBlock;
@@ -2054,7 +2062,10 @@ public class BlockManager {
       // more than one storage on a datanode (and because it's a difficult
       // assumption to really enforce)
       removeStoredBlock(block, zombie.getDatanodeDescriptor());
-      invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block);
+      Block b = getBlockOnStorage(block, zombie);
+      if (b != null) {
+        invalidateBlocks.remove(zombie.getDatanodeDescriptor(), b);
+      }
     }
     assert(zombie.numBlocks() == 0);
     LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " +
@@ -3273,7 +3284,7 @@ public class BlockManager {
     // should be deleted.  Items are removed from the invalidate list
     // upon giving instructions to the datanodes.
     //
-    final Block blockToInvalidate = getBlockToInvalidate(storedBlock, chosen);
+    final Block blockToInvalidate = getBlockOnStorage(storedBlock, chosen);
     addToInvalidates(blockToInvalidate, chosen.getDatanodeDescriptor());
     blockLog.info("BLOCK* chooseExcessReplicates: "
         +"({}, {}) is added to invalidated blocks set", chosen, storedBlock);
@@ -3838,6 +3849,12 @@ public class BlockManager {
     return toInvalidate.size();
   }
 
+  @VisibleForTesting
+  public boolean containsInvalidateBlock(final DatanodeInfo dn,
+      final Block block) {
+    return invalidateBlocks.contains(dn, block);
+  }
+
   boolean blockHasEnoughRacks(BlockInfo storedBlock, int expectedStorageNum) {
     if (!this.shouldCheckForEnoughRacks) {
       return true;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5956d23b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 47bc765..e4366c9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -694,6 +694,13 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
+  @VisibleForTesting
+  public boolean containsInvalidateBlock(Block block) {
+    synchronized (invalidateBlocks) {
+      return invalidateBlocks.contains(block);
+    }
+  }
+
   /**
    * @return Approximate number of blocks currently scheduled to be written 
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5956d23b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
index 34d6034..8afea19 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
@@ -22,13 +22,16 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.junit.After;
 import org.junit.Assert;
@@ -274,28 +277,68 @@ public class TestReadStripedFileWithDecoding {
       DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
     }
 
-    // do stateful read
-    ByteBuffer result = ByteBuffer.allocate(length);
-    ByteBuffer buf = ByteBuffer.allocate(1024);
-    int readLen = 0;
-    int ret;
-    try (FSDataInputStream in = fs.open(file)) {
-      while ((ret = in.read(buf)) >= 0) {
-        readLen += ret;
-        buf.flip();
-        result.put(buf);
-        buf.clear();
+    try {
+      // do stateful read
+      ByteBuffer result = ByteBuffer.allocate(length);
+      ByteBuffer buf = ByteBuffer.allocate(1024);
+      int readLen = 0;
+      int ret;
+      try (FSDataInputStream in = fs.open(file)) {
+        while ((ret = in.read(buf)) >= 0) {
+          readLen += ret;
+          buf.flip();
+          result.put(buf);
+          buf.clear();
+        }
+      }
+      Assert.assertEquals("The length of file should be the same to write size",
+          length, readLen);
+      Assert.assertArrayEquals(bytes, result.array());
+
+      // check whether the corruption has been reported to the NameNode
+      final FSNamesystem ns = cluster.getNamesystem();
+      final BlockManager bm = ns.getBlockManager();
+      BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
+          .asFile().getBlocks())[0];
+      Assert.assertEquals(1, bm.getCorruptReplicas(blockInfo).size());
+    } finally {
+      for (DataNode dn : cluster.getDataNodes()) {
+        DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
       }
     }
-    Assert.assertEquals("The length of file should be the same to write size",
-        length, readLen);
-    Assert.assertArrayEquals(bytes, result.array());
+  }
+
+  @Test
+  public void testInvalidateBlock() throws IOException {
+    final Path file = new Path("/invalidate");
+    final int length = 10;
+    final byte[] bytes = StripedFileTestUtil.generateBytes(length);
+    DFSTestUtil.writeFile(fs, file, bytes);
 
-    // check whether the corruption has been reported to the NameNode
-    final FSNamesystem ns = cluster.getNamesystem();
-    final BlockManager bm = ns.getBlockManager();
-    BlockInfo blockInfo = (ns.getFSDirectory().getINode4Write(file.toString())
-        .asFile().getBlocks())[0];
-    Assert.assertEquals(1, bm.getCorruptReplicas(blockInfo).size());
+    int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
+    Assert.assertNotEquals(-1, dnIndex);
+    LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
+        .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
+    final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
+        cellSize, dataBlocks, parityBlocks);
+    final Block b = blks[0].getBlock().getLocalBlock();
+
+    DataNode dn = cluster.getDataNodes().get(dnIndex);
+    // disable the heartbeat from DN so that the invalidated block record is kept
+    // in NameNode until heartbeat expires and NN mark the dn as dead
+    DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
+
+    try {
+      // delete the file
+      fs.delete(file, true);
+      // check the block is added to invalidateBlocks
+      final FSNamesystem fsn = cluster.getNamesystem();
+      final BlockManager bm = fsn.getBlockManager();
+      DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
+      Assert.assertTrue(bm.containsInvalidateBlock(
+          blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b));
+    } finally {
+      DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
+    }
   }
 }


Mime
View raw message