hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r1161992 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ ...
Date Fri, 26 Aug 2011 04:46:42 GMT
Author: hairong
Date: Fri Aug 26 04:46:42 2011
New Revision: 1161992

URL: http://svn.apache.org/viewvc?rev=1161992&view=rev
Log:
HDFS-395.  DFS Scalability: Incremental block reports. Contributed by Tomasz Nykiel.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1161992&r1=1161991&r2=1161992&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Aug 26 04:46:42 2011
@@ -2,7 +2,7 @@ Hadoop HDFS Change Log
 
 Trunk (unreleased changes)
   NEW FEATURES
-    HDFS-349.  DFS Scalability: Incremental block reports. (Tomasz Nykiel
+    HDFS-395.  DFS Scalability: Incremental block reports. (Tomasz Nykiel
                via hairong)
 
 Release 0.23.0 - Unreleased

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1161992&r1=1161991&r2=1161992&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
Fri Aug 26 04:46:42 2011
@@ -59,10 +59,12 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.Daemon;
 
@@ -2002,7 +2004,7 @@ public class BlockManager {
    * Modify (block-->datanode) map. Possibly generate replication tasks, if the
    * removed block is still valid.
    */
-  private void removeStoredBlock(Block block, DatanodeDescriptor node) {
+  public void removeStoredBlock(Block block, DatanodeDescriptor node) {
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("BLOCK* removeStoredBlock: "
           + block + " from " + node.getName());
@@ -2121,27 +2123,48 @@ public class BlockManager {
     }
   }
 
-  /** The given node is reporting that it received a certain block. */
-  public void blockReceived(final DatanodeID nodeID, final String poolId,
-      final Block block, final String delHint) throws IOException {
+  /** The given node is reporting that it received/deleted certain blocks. */
+  public void blockReceivedAndDeleted(final DatanodeID nodeID, 
+     final String poolId, 
+     final ReceivedDeletedBlockInfo receivedAndDeletedBlocks[]
+  ) throws IOException {
     namesystem.writeLock();
+    int received = 0;
+    int deleted = 0;
     try {
       final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
       if (node == null || !node.isAlive) {
-        final String s = block + " is received from dead or unregistered node "
-            + nodeID.getName();
-        NameNode.stateChangeLog.warn("BLOCK* blockReceived: " + s);
-        throw new IOException(s);
-      } 
-
-      if (NameNode.stateChangeLog.isDebugEnabled()) {
-        NameNode.stateChangeLog.debug("BLOCK* blockReceived: " + block
-            + " is received from " + nodeID.getName());
+        NameNode.stateChangeLog
+            .warn("BLOCK* blockReceivedDeleted"
+                + " is received from dead or unregistered node "
+                + nodeID.getName());
+        throw new IOException(
+            "Got blockReceivedDeleted message from unregistered or dead node");
       }
 
-      addBlock(node, block, delHint);
+      for (int i = 0; i < receivedAndDeletedBlocks.length; i++) {
+        if (receivedAndDeletedBlocks[i].isDeletedBlock()) {
+          removeStoredBlock(
+              receivedAndDeletedBlocks[i].getBlock(), node);
+          deleted++;
+        } else {
+          addBlock(node, receivedAndDeletedBlocks[i].getBlock(),
+              receivedAndDeletedBlocks[i].getDelHints());
+          received++;
+        }
+        if (NameNode.stateChangeLog.isDebugEnabled()) {
+          NameNode.stateChangeLog.debug("BLOCK* block"
+              + (receivedAndDeletedBlocks[i].isDeletedBlock() ? "Deleted"
+                  : "Received") + ": " + receivedAndDeletedBlocks[i].getBlock()
+              + " is received from " + nodeID.getName());
+        }
+      }
     } finally {
       namesystem.writeUnlock();
+      NameNode.stateChangeLog
+          .debug("*BLOCK* NameNode.blockReceivedAndDeleted: " + "from "
+              + nodeID.getName() + " received: " + received + ", "
+              + " deleted: " + deleted);
     }
   }
 
@@ -2316,6 +2339,7 @@ public class BlockManager {
   }
 
   public void removeBlock(Block block) {
+    block.setNumBytes(BlockCommand.NO_ACK);
     addToInvalidates(block);
     corruptReplicas.removeFromCorruptReplicasMap(block);
     blocksMap.removeBlock(block);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1161992&r1=1161991&r2=1161992&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Fri Aug 26 04:46:42 2011
@@ -106,6 +106,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
@@ -348,6 +349,8 @@ public class DataNode extends Configured
   ThreadGroup threadGroup = null;
   long blockReportInterval;
   boolean resetBlockReportTime = true;
+  long deleteReportInterval;
+  long lastDeletedReport = 0;
   long initialBlockReportDelay = DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT * 1000L;
   long heartBeatInterval;
   private boolean heartbeatsDisabledForTests = false;
@@ -458,6 +461,7 @@ public class DataNode extends Configured
     this.heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
         DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;
 
+    this.deleteReportInterval = 100 * heartBeatInterval;
     // do we need to sync block file contents to disk when blockfile is closed?
     this.syncOnClose = conf.getBoolean(DFS_DATANODE_SYNCONCLOSE_KEY, 
                                        DFS_DATANODE_SYNCONCLOSE_DEFAULT);
@@ -643,6 +647,17 @@ public class DataNode extends Configured
     }
   }
   
+  // calls specific to BP
+  protected void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+    BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
+    if (bpos != null) {
+      bpos.notifyNamenodeDeletedBlock(block);
+    } else {
+      LOG.warn("Cannot find BPOfferService for reporting block deleted for bpid="
+          + block.getBlockPoolId());
+    }
+  }
+  
   public void reportBadBlocks(ExtendedBlock block) throws IOException{
     BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
     if(bpos == null || bpos.bpNamenode == null) {
@@ -677,8 +692,9 @@ public class DataNode extends Configured
     private String blockPoolId;
     private long lastHeartbeat = 0;
     private volatile boolean initialized = false;
-    private final LinkedList<Block> receivedBlockList = new LinkedList<Block>();
-    private final LinkedList<String> delHints = new LinkedList<String>();
+    private final LinkedList<ReceivedDeletedBlockInfo> receivedAndDeletedBlockList

+      = new LinkedList<ReceivedDeletedBlockInfo>();
+    private volatile int pendingReceivedRequests = 0;
     private volatile boolean shouldServiceRun = true;
     private boolean isBlockTokenInitialized = false;
     UpgradeManagerDatanode upgradeManager = null;
@@ -848,41 +864,33 @@ public class DataNode extends Configured
     
     /**
      * Report received blocks and delete hints to the Namenode
+     * 
      * @throws IOException
      */
-    private void reportReceivedBlocks() throws IOException {
-      //check if there are newly received blocks
-      Block [] blockArray=null;
-      String [] delHintArray=null;
-      synchronized(receivedBlockList) {
-        synchronized(delHints){
-          int numBlocks = receivedBlockList.size();
-          if (numBlocks > 0) {
-            if(numBlocks!=delHints.size()) {
-              LOG.warn("Panic: receiveBlockList and delHints are not of " +
-              "the same length" );
-            }
-            //
-            // Send newly-received blockids to namenode
-            //
-            blockArray = receivedBlockList.toArray(new Block[numBlocks]);
-            delHintArray = delHints.toArray(new String[numBlocks]);
-          }
+    private void reportReceivedDeletedBlocks() throws IOException {
+
+      // check if there are newly received blocks
+      ReceivedDeletedBlockInfo[] receivedAndDeletedBlockArray = null;
+      int currentReceivedRequestsCounter;
+      synchronized (receivedAndDeletedBlockList) {
+        currentReceivedRequestsCounter = pendingReceivedRequests;
+        int numBlocks = receivedAndDeletedBlockList.size();
+        if (numBlocks > 0) {
+          //
+          // Send newly-received and deleted blockids to namenode
+          //
+          receivedAndDeletedBlockArray = receivedAndDeletedBlockList
+              .toArray(new ReceivedDeletedBlockInfo[numBlocks]);
         }
       }
-      if (blockArray != null) {
-        if(delHintArray == null || delHintArray.length != blockArray.length ) {
-          LOG.warn("Panic: block array & delHintArray are not the same" );
-        }
-        bpNamenode.blockReceived(bpRegistration, blockPoolId, blockArray,
-            delHintArray);
-        synchronized(receivedBlockList) {
-          synchronized(delHints){
-            for(int i=0; i<blockArray.length; i++) {
-              receivedBlockList.remove(blockArray[i]);
-              delHints.remove(delHintArray[i]);
-            }
+      if (receivedAndDeletedBlockArray != null) {
+        bpNamenode.blockReceivedAndDeleted(bpRegistration, blockPoolId,
+            receivedAndDeletedBlockArray);
+        synchronized (receivedAndDeletedBlockList) {
+          for (int i = 0; i < receivedAndDeletedBlockArray.length; i++) {
+            receivedAndDeletedBlockList.remove(receivedAndDeletedBlockArray[i]);
           }
+          pendingReceivedRequests -= currentReceivedRequestsCounter;
         }
       }
     }
@@ -893,23 +901,39 @@ public class DataNode extends Configured
      * client? For now we don't.
      */
     void notifyNamenodeReceivedBlock(ExtendedBlock block, String delHint) {
-      if(block==null || delHint==null) {
-        throw new IllegalArgumentException(
-            block==null?"Block is null":"delHint is null");
+      if (block == null || delHint == null) {
+        throw new IllegalArgumentException(block == null ? "Block is null"
+            : "delHint is null");
       }
-      
+
       if (!block.getBlockPoolId().equals(blockPoolId)) {
-        LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + 
-            " vs. " + blockPoolId);
+        LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
+            + blockPoolId);
         return;
       }
-      
-      synchronized (receivedBlockList) {
-        synchronized (delHints) {
-          receivedBlockList.add(block.getLocalBlock());
-          delHints.add(delHint);
-          receivedBlockList.notifyAll();
-        }
+
+      synchronized (receivedAndDeletedBlockList) {
+        receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
+            .getLocalBlock(), delHint));
+        pendingReceivedRequests++;
+        receivedAndDeletedBlockList.notifyAll();
+      }
+    }
+
+    void notifyNamenodeDeletedBlock(ExtendedBlock block) {
+      if (block == null) {
+        throw new IllegalArgumentException("Block is null");
+      }
+
+      if (!block.getBlockPoolId().equals(blockPoolId)) {
+        LOG.warn("BlockPool mismatch " + block.getBlockPoolId() + " vs. "
+            + blockPoolId);
+        return;
+      }
+
+      synchronized (receivedAndDeletedBlockList) {
+        receivedAndDeletedBlockList.add(new ReceivedDeletedBlockInfo(block
+            .getLocalBlock(), ReceivedDeletedBlockInfo.TODELETE_HINT));
       }
     }
 
@@ -1027,7 +1051,8 @@ public class DataNode extends Configured
      * forever calling remote NameNode functions.
      */
     private void offerService() throws Exception {
-      LOG.info("For namenode " + nnAddr + " using BLOCKREPORT_INTERVAL of "
+      LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
+          + deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
           + blockReportInterval + "msec" + " Initial delay: "
           + initialBlockReportDelay + "msec" + "; heartBeatInterval="
           + heartBeatInterval);
@@ -1058,8 +1083,11 @@ public class DataNode extends Configured
                 continue;
             }
           }
-
-          reportReceivedBlocks();
+          if (pendingReceivedRequests > 0
+              || (startTime - lastDeletedReport > deleteReportInterval)) {
+            reportReceivedDeletedBlocks();
+            lastDeletedReport = startTime;
+          }
 
           DatanodeCommand cmd = blockReport();
           processCommand(cmd);
@@ -1075,10 +1103,10 @@ public class DataNode extends Configured
           //
           long waitTime = heartBeatInterval - 
           (System.currentTimeMillis() - lastHeartbeat);
-          synchronized(receivedBlockList) {
-            if (waitTime > 0 && receivedBlockList.size() == 0) {
+          synchronized(receivedAndDeletedBlockList) {
+            if (waitTime > 0 && pendingReceivedRequests == 0) {
               try {
-                receivedBlockList.wait(waitTime);
+                receivedAndDeletedBlockList.wait(waitTime);
               } catch (InterruptedException ie) {
                 LOG.warn("BPOfferService for block pool="
                     + this.getBlockPoolId() + " received exception:" + ie);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1161992&r1=1161991&r2=1161992&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Fri Aug 26 04:46:42 2011
@@ -1182,7 +1182,7 @@ public class FSDataset implements FSData
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
       roots[idx] = storage.getStorageDir(idx).getCurrentDir();
     }
-    asyncDiskService = new FSDatasetAsyncDiskService(roots);
+    asyncDiskService = new FSDatasetAsyncDiskService(this, roots);
     registerMBean(storage.getStorageID());
   }
 
@@ -2089,15 +2089,19 @@ public class FSDataset implements FSData
       }
       File metaFile = getMetaFile(f, invalidBlks[i].getGenerationStamp());
       long dfsBytes = f.length() + metaFile.length();
-      
+
       // Delete the block asynchronously to make sure we can do it fast enough
-      asyncDiskService.deleteAsync(v, bpid, f, metaFile, dfsBytes,
-          invalidBlks[i].toString());
+      asyncDiskService.deleteAsync(v, f, metaFile, dfsBytes,
+          new ExtendedBlock(bpid, invalidBlks[i]));
     }
     if (error) {
       throw new IOException("Error in deleting blocks.");
     }
   }
+  
+  public void notifyNamenodeDeletedBlock(ExtendedBlock block){
+    datanode.notifyNamenodeDeletedBlock(block);
+  }
 
   /**
    * Turn the block identifier into a filename; ignore generation stamp!!!

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=1161992&r1=1161991&r2=1161992&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
Fri Aug 26 04:46:42 2011
@@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 
 /*
  * This class is a container of multiple thread pools, each for a volume,
@@ -47,6 +49,8 @@ import org.apache.commons.logging.LogFac
  */
 class FSDatasetAsyncDiskService {
   
+  final FSDataset dataset;
+  
   public static final Log LOG = LogFactory.getLog(FSDatasetAsyncDiskService.class);
   
   // ThreadPool core pool size
@@ -70,8 +74,8 @@ class FSDatasetAsyncDiskService {
    * 
    * @param volumes The roots of the data volumes.
    */
-  FSDatasetAsyncDiskService(File[] volumes) {
-
+  FSDatasetAsyncDiskService(FSDataset dataset, File[] volumes) {
+    this.dataset = dataset;
     // Create one ThreadPool per volume
     for (int v = 0 ; v < volumes.length; v++) {
       final File vol = volumes[v];
@@ -147,13 +151,12 @@ class FSDatasetAsyncDiskService {
    * Delete the block file and meta file from the disk asynchronously, adjust
    * dfsUsed statistics accordingly.
    */
-  void deleteAsync(FSDataset.FSVolume volume, String bpid, File blockFile,
-      File metaFile, long dfsBytes, String blockName) {
-    DataNode.LOG.info("Scheduling block " + blockName + " file " + blockFile
-        + " for deletion");
-    ReplicaFileDeleteTask deletionTask = 
-        new ReplicaFileDeleteTask(volume, bpid, blockFile, metaFile, dfsBytes,
-            blockName);
+  void deleteAsync(FSDataset.FSVolume volume, File blockFile, File metaFile,
+      long dfsBytes, ExtendedBlock block) {
+    DataNode.LOG.info("Scheduling block " + block.getLocalBlock().toString()
+        + " file " + blockFile + " for deletion");
+    ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(dataset,
+        volume, blockFile, metaFile, dfsBytes, block);
     execute(volume.getCurrentDir(), deletionTask);
   }
   
@@ -161,21 +164,21 @@ class FSDatasetAsyncDiskService {
    *  as decrement the dfs usage of the volume. 
    */
   static class ReplicaFileDeleteTask implements Runnable {
+    final FSDataset dataset;
     final FSDataset.FSVolume volume;
-    final String blockPoolId;
     final File blockFile;
     final File metaFile;
     final long dfsBytes;
-    final String blockName;
+    final ExtendedBlock block;
     
-    ReplicaFileDeleteTask(FSDataset.FSVolume volume, String bpid,
-        File blockFile, File metaFile, long dfsBytes, String blockName) {
+    ReplicaFileDeleteTask(FSDataset dataset, FSDataset.FSVolume volume, File blockFile,
+        File metaFile, long dfsBytes, ExtendedBlock block) {
+      this.dataset = dataset;
       this.volume = volume;
-      this.blockPoolId = bpid;
       this.blockFile = blockFile;
       this.metaFile = metaFile;
       this.dfsBytes = dfsBytes;
-      this.blockName = blockName;
+      this.block = block;
     }
     
     FSDataset.FSVolume getVolume() {
@@ -185,21 +188,24 @@ class FSDatasetAsyncDiskService {
     @Override
     public String toString() {
       // Called in AsyncDiskService.execute for displaying error messages.
-      return "deletion of block " + blockPoolId + " " + blockName
-          + " with block file " + blockFile + " and meta file " + metaFile
-          + " from volume " + volume;
+      return "deletion of block " + block.getBlockPoolId() + " "
+          + block.getLocalBlock().toString() + " with block file " + blockFile
+          + " and meta file " + metaFile + " from volume " + volume;
     }
 
     @Override
     public void run() {
       if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
         DataNode.LOG.warn("Unexpected error trying to delete block "
-            + blockPoolId + " " + blockName + " at file " + blockFile
-            + ". Ignored.");
+            + block.getBlockPoolId() + " " + block.getLocalBlock().toString()
+            + " at file " + blockFile + ". Ignored.");
       } else {
-        volume.decDfsUsed(blockPoolId, dfsBytes);
-        DataNode.LOG.info("Deleted block " + blockPoolId + " " + blockName
-            + " at file " + blockFile);
+        if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
+          dataset.notifyNamenodeDeletedBlock(block);
+        }
+        volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
+        DataNode.LOG.info("Deleted block " + block.getBlockPoolId() + " "
+            + block.getLocalBlock().toString() + " at file " + blockFile);
       }
     }
   };

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1161992&r1=1161991&r2=1161992&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
Fri Aug 26 04:46:42 2011
@@ -83,6 +83,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
 import org.apache.hadoop.io.EnumSetWritable;
@@ -1209,17 +1210,16 @@ public class NameNode implements Namenod
   }
 
   @Override // DatanodeProtocol
-  public void blockReceived(DatanodeRegistration nodeReg, String poolId,
-      Block blocks[], String delHints[]) throws IOException {
+  public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
+      ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks) throws IOException {
     verifyRequest(nodeReg);
     if(stateChangeLog.isDebugEnabled()) {
-      stateChangeLog.debug("*BLOCK* NameNode.blockReceived: "
-          +"from "+nodeReg.getName()+" "+blocks.length+" blocks.");
-    }
-    for (int i = 0; i < blocks.length; i++) {
-      namesystem.getBlockManager().blockReceived(
-          nodeReg, poolId, blocks[i], delHints[i]);
+      stateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: "
+          +"from "+nodeReg.getName()+" "+receivedAndDeletedBlocks.length
+          +" blocks.");
     }
+    namesystem.getBlockManager().blockReceivedAndDeleted(
+        nodeReg, poolId, receivedAndDeletedBlocks);
   }
 
   @Override // DatanodeProtocol

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java?rev=1161992&r1=1161991&r2=1161992&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
Fri Aug 26 04:46:42 2011
@@ -44,6 +44,16 @@ import org.apache.hadoop.io.WritableFact
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class BlockCommand extends DatanodeCommand {
+  
+  /**
+   * This constant is used to indicate that the block deletion does not need
+   * explicit ACK from the datanode. When a block is put into the list of blocks
+   * to be deleted, it's size is set to this constant. We assume that no block
+   * would actually have this size. Otherwise, we would miss ACKs for blocks
+   * with such size. Positive number is used for compatibility reasons.
+   */
+  public static final long NO_ACK = Long.MAX_VALUE;
+  
   String poolId;
   Block blocks[];
   DatanodeInfo targets[][];

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1161992&r1=1161991&r2=1161992&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
Fri Aug 26 04:46:42 2011
@@ -126,17 +126,19 @@ public interface DatanodeProtocol extend
                                      long[] blocks) throws IOException;
     
   /**
-   * blockReceived() allows the DataNode to tell the NameNode about
-   * recently-received block data, with a hint for pereferred replica
-   * to be deleted when there is any excessive blocks.
+   * blockReceivedAndDeleted() allows the DataNode to tell the NameNode about
+   * recently-received and -deleted block data. 
+   * 
+   * For the case of received blocks, a hint for preferred replica to be 
+   * deleted when there is any excessive blocks is provided.
    * For example, whenever client code
    * writes a new Block here, or another DataNode copies a Block to
    * this DataNode, it will call blockReceived().
    */
-  public void blockReceived(DatanodeRegistration registration,
+  public void blockReceivedAndDeleted(DatanodeRegistration registration,
                             String poolId,
-                            Block blocks[],
-                            String[] delHints) throws IOException;
+                            ReceivedDeletedBlockInfo[] receivedAndDeletedBlocks)
+                            throws IOException;
 
   /**
    * errorReport() tells the NameNode about something that has gone

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java?rev=1161992&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java
(added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ReceivedDeletedBlockInfo.java
Fri Aug 26 04:46:42 2011
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A data structure to store Block and delHints together, used to send
+ * received/deleted ACKs.
+ */
+public class ReceivedDeletedBlockInfo implements Writable {
+  Block block;
+  String delHints;
+
+  public final static String TODELETE_HINT = "-";
+
+  public ReceivedDeletedBlockInfo() {
+  }
+
+  public ReceivedDeletedBlockInfo(Block blk, String delHints) {
+    this.block = blk;
+    this.delHints = delHints;
+  }
+
+  public Block getBlock() {
+    return this.block;
+  }
+
+  public void setBlock(Block blk) {
+    this.block = blk;
+  }
+
+  public String getDelHints() {
+    return this.delHints;
+  }
+
+  public void setDelHints(String hints) {
+    this.delHints = hints;
+  }
+
+  public boolean equals(Object o) {
+    if (!(o instanceof ReceivedDeletedBlockInfo)) {
+      return false;
+    }
+    ReceivedDeletedBlockInfo other = (ReceivedDeletedBlockInfo) o;
+    return this.block.equals(other.getBlock())
+        && this.delHints.equals(other.delHints);
+  }
+
+  public int hashCode() {
+    assert false : "hashCode not designed";
+    return 0; 
+  }
+
+  public boolean blockEquals(Block b) {
+    return this.block.equals(b);
+  }
+
+  public boolean isDeletedBlock() {
+    return delHints.equals(TODELETE_HINT);
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    this.block.write(out);
+    Text.writeString(out, this.delHints);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    this.block = new Block();
+    this.block.readFields(in);
+    this.delHints = Text.readString(in);
+  }
+
+  public String toString() {
+    return block.toString() + ", delHint: " + delHints;
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=1161992&r1=1161991&r2=1161992&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
Fri Aug 26 04:46:42 2011
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.io.EnumSetWritable;
@@ -877,10 +878,10 @@ public class NNThroughputBenchmark {
           receivedDNReg.setStorageInfo(
                           new DataStorage(nsInfo, dnInfo.getStorageID()));
           receivedDNReg.setInfoPort(dnInfo.getInfoPort());
-          nameNode.blockReceived( receivedDNReg, 
-                                  nameNode.getNamesystem().getBlockPoolId(),
-                                  new Block[] {blocks[i]},
-                                  new String[] {DataNode.EMPTY_DEL_HINT});
+          nameNode.blockReceivedAndDeleted(receivedDNReg, nameNode
+              .getNamesystem().getBlockPoolId(),
+              new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(
+                  blocks[i], DataNode.EMPTY_DEL_HINT) });
         }
       }
       return blocks.length;
@@ -992,11 +993,10 @@ public class NNThroughputBenchmark {
         for(DatanodeInfo dnInfo : loc.getLocations()) {
           int dnIdx = Arrays.binarySearch(datanodes, dnInfo.getName());
           datanodes[dnIdx].addBlock(loc.getBlock().getLocalBlock());
-          nameNode.blockReceived(
-              datanodes[dnIdx].dnRegistration, 
-              loc.getBlock().getBlockPoolId(),
-              new Block[] {loc.getBlock().getLocalBlock()},
-              new String[] {""});
+          nameNode.blockReceivedAndDeleted(datanodes[dnIdx].dnRegistration, loc
+              .getBlock().getBlockPoolId(),
+              new ReceivedDeletedBlockInfo[] { new ReceivedDeletedBlockInfo(loc
+                  .getBlock().getLocalBlock(), "") });
         }
       }
       return prevBlock;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java?rev=1161992&r1=1161991&r2=1161992&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
Fri Aug 26 04:46:42 2011
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.junit.After;
 import org.junit.Test;
 
@@ -104,12 +105,12 @@ public class TestDeadDatanode {
 
     DatanodeProtocol dnp = cluster.getNameNode();
     
-    Block[] blocks = new Block[] { new Block(0) };
-    String[] delHints = new String[] { "" };
+    ReceivedDeletedBlockInfo[] blocks = { new ReceivedDeletedBlockInfo(
+        new Block(0), "") };
     
     // Ensure blockReceived call from dead datanode is rejected with IOException
     try {
-      dnp.blockReceived(reg, poolId, blocks, delHints);
+      dnp.blockReceivedAndDeleted(reg, poolId, blocks);
       Assert.fail("Expected IOException is not thrown");
     } catch (IOException ex) {
       // Expected



Mime
View raw message