hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r740077 - in /hadoop/core/trunk: ./ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/hdfs/org/apache/hadoop/hdfs/server/protocol/ src/test/org/apache/hadoop/hdfs/server/namenode/
Date Mon, 02 Feb 2009 19:03:15 GMT
Author: hairong
Date: Mon Feb  2 19:03:12 2009
New Revision: 740077

URL: http://svn.apache.org/viewvc?rev=740077&view=rev
Log:
HADOOP-5034. NameNode should send both replication and deletion requests to DataNode in one
reply to a heartbeat. Contributed by Hairong Kuang.


Added:
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java
Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
    hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=740077&r1=740076&r2=740077&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Mon Feb  2 19:03:12 2009
@@ -741,6 +741,9 @@
     HADOOP-4862. Minor : HADOOP-3678 did not remove all the cases of 
     spurious IOExceptions logged by DataNode. (Raghu Angadi) 
 
+    HADOOP-5034. NameNode should send both replication and deletion requests
+    to DataNode in one reply to a heartbeat. (hairong)
+
 Release 0.19.0 - 2008-11-18
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=740077&r1=740076&r2=740077&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Feb
 2 19:03:12 2009
@@ -694,7 +694,7 @@
           // -- Bytes remaining
           //
           lastHeartbeat = startTime;
-          DatanodeCommand cmd = namenode.sendHeartbeat(dnRegistration,
+          DatanodeCommand[] cmds = namenode.sendHeartbeat(dnRegistration,
                                                        data.getCapacity(),
                                                        data.getDfsUsed(),
                                                        data.getRemaining(),
@@ -702,7 +702,7 @@
                                                        getXceiverCount());
           myMetrics.heartbeats.inc(now() - startTime);
           //LOG.info("Just sent heartbeat, with name " + localName);
-          if (!processCommand(cmd))
+          if (!processCommand(cmds))
             continue;
         }
             
@@ -812,6 +812,27 @@
     } // while (shouldRun)
   } // offerService
 
+  /**
+   * Process an array of datanode commands
+   * 
+   * @param cmds an array of datanode commands
+   * @return true if further processing may be required or false otherwise. 
+   */
+  private boolean processCommand(DatanodeCommand[] cmds) {
+    if (cmds != null) {
+      for (DatanodeCommand cmd : cmds) {
+        try {
+          if (processCommand(cmd) == false) {
+            return false;
+          }
+        } catch (IOException ioe) {
+          LOG.warn("Error processing datanode Command", ioe);
+        }
+      }
+    }
+    return true;
+  }
+  
     /**
      * 
      * @param cmd

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=740077&r1=740076&r2=740077&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon
Feb  2 19:03:12 2009
@@ -2132,10 +2132,10 @@
    * If a substantial amount of time passed since the last datanode 
    * heartbeat then request an immediate block report.  
    * 
-   * @return a datanode command 
+   * @return an array of datanode commands 
    * @throws IOException
    */
-  DatanodeCommand handleHeartbeat(DatanodeRegistration nodeReg,
+  DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
       long capacity, long dfsUsed, long remaining,
       int xceiverCount, int xmitsInProgress) throws IOException {
     DatanodeCommand cmd = null;
@@ -2145,7 +2145,7 @@
         try {
           nodeinfo = getDatanode(nodeReg);
         } catch(UnregisteredDatanodeException e) {
-          return DatanodeCommand.REGISTER;
+          return new DatanodeCommand[]{DatanodeCommand.REGISTER};
         }
           
         // Check if this datanode should actually be shutdown instead. 
@@ -2155,7 +2155,7 @@
         }
 
         if (nodeinfo == null || !nodeinfo.isAlive) {
-          return DatanodeCommand.REGISTER;
+          return new DatanodeCommand[]{DatanodeCommand.REGISTER};
         }
 
         updateStats(nodeinfo, false);
@@ -2163,26 +2163,35 @@
         updateStats(nodeinfo, true);
         
         //check lease recovery
-        if (cmd == null) {
-          cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
+        cmd = nodeinfo.getLeaseRecoveryCommand(Integer.MAX_VALUE);
+        if (cmd != null) {
+          return new DatanodeCommand[] {cmd};
         }
+      
+        ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>(2);
         //check pending replication
-        if (cmd == null) {
-          cmd = nodeinfo.getReplicationCommand(
+        cmd = nodeinfo.getReplicationCommand(
               maxReplicationStreams - xmitsInProgress);
+        if (cmd != null) {
+          cmds.add(cmd);
         }
         //check block invalidation
-        if (cmd == null) {
-          cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+        cmd = nodeinfo.getInvalidateBlocks(blockInvalidateLimit);
+        if (cmd != null) {
+          cmds.add(cmd);
+        }
+        if (!cmds.isEmpty()) {
+          return cmds.toArray(new DatanodeCommand[cmds.size()]);
         }
       }
     }
 
     //check distributed upgrade
-    if (cmd == null) {
-      cmd = getDistributedUpgradeCommand();
+    cmd = getDistributedUpgradeCommand();
+    if (cmd != null) {
+      return new DatanodeCommand[] {cmd};
     }
-    return cmd;
+    return null;
   }
 
   private void updateStats(DatanodeDescriptor node, boolean isAdded) {

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=740077&r1=740076&r2=740077&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Mon Feb
 2 19:03:12 2009
@@ -697,10 +697,10 @@
 
   /**
    * Data node notify the name node that it is alive 
-   * Return a block-oriented command for the datanode to execute.
+   * Return an array of block-oriented commands for the datanode to execute.
    * This will be either a transfer or a delete operation.
    */
-  public DatanodeCommand sendHeartbeat(DatanodeRegistration nodeReg,
+  public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
                                        long capacity,
                                        long dfsUsed,
                                        long remaining,

Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=740077&r1=740076&r2=740077&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
(original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
Mon Feb  2 19:03:12 2009
@@ -35,15 +35,10 @@
  **********************************************************************/
 public interface DatanodeProtocol extends VersionedProtocol {
   /**
-   * 18: In sendHeartbeat, the capacity parameter reported was sum of 
-   *     the filesystem disk space of all the data directories. This is 
-   *     changed to exclude the reserved capacity defined by 
-   *     dfs.datanode.du.reserved. 
-   *
-   *     The new capacity reported is sum of the filesystem disk space of 
-   *     all the data directories minus the reserved capacity.
+   * 19: SendHeartbeat returns an array of DatanodeCommand objects
+   *     in stead of a DatanodeCommand object.
    */
-  public static final long versionID = 18L;
+  public static final long versionID = 19L;
   
   // error code
   final static int NOTIFY = 0;
@@ -77,11 +72,12 @@
   /**
    * sendHeartbeat() tells the NameNode that the DataNode is still
    * alive and well.  Includes some status info, too. 
-   * It also gives the NameNode a chance to return a "DatanodeCommand" object.
+   * It also gives the NameNode a chance to return 
+   * an array of "DatanodeCommand" objects.
    * A DatanodeCommand tells the DataNode to invalidate local block(s), 
    * or to copy them to other DataNodes, etc.
    */
-  public DatanodeCommand sendHeartbeat(DatanodeRegistration registration,
+  public DatanodeCommand[] sendHeartbeat(DatanodeRegistration registration,
                                        long capacity,
                                        long dfsUsed, long remaining,
                                        int xmitsInProgress,

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java?rev=740077&r1=740076&r2=740077&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
(original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
Mon Feb  2 19:03:12 2009
@@ -724,10 +724,13 @@
      */
     void sendHeartbeat() throws IOException {
       // register datanode
-      DatanodeCommand cmd = nameNode.sendHeartbeat(
+      DatanodeCommand[] cmds = nameNode.sendHeartbeat(
           dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
-      if(cmd != null)
-        LOG.debug("sendHeartbeat Name-node reply: " + cmd.getAction());
+      if(cmds != null) {
+        for (DatanodeCommand cmd : cmds ) {
+          LOG.debug("sendHeartbeat Name-node reply: " + cmd.getAction());
+        }
+      }
     }
 
     boolean addBlock(Block blk) {
@@ -755,13 +758,18 @@
      */
     int replicateBlocks() throws IOException {
       // register datanode
-      DatanodeCommand cmd = nameNode.sendHeartbeat(
+      DatanodeCommand[] cmds = nameNode.sendHeartbeat(
           dnRegistration, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, 0, 0);
-      if(cmd == null || cmd.getAction() != DatanodeProtocol.DNA_TRANSFER)
-        return 0;
-      // Send a copy of a block to another datanode
-      BlockCommand bcmd = (BlockCommand)cmd;
-      return transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
+      if (cmds != null) {
+        for (DatanodeCommand cmd : cmds) {
+          if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
+            // Send a copy of a block to another datanode
+            BlockCommand bcmd = (BlockCommand)cmd;
+            return transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
+          }
+        }
+      }
+      return 0;
     }
 
     /**

Added: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java?rev=740077&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java
(added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/server/namenode/TestHeartbeatHandling.java
Mon Feb  2 19:03:12 2009
@@ -0,0 +1,88 @@
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.ArrayList;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+
+import junit.framework.TestCase;
+
+/**
+ * Test if FSNamesystem handles heartbeat right
+ */
+public class TestHeartbeatHandling extends TestCase {
+  /**
+   * Test if {@link FSNamesystem#handleHeartbeat(DatanodeRegistration, long, long, long,
int, int)}
+   * can pick up replication and/or invalidate requests and 
+   * observes the max limit
+   */
+  public void testHeartbeat() throws Exception {
+    final Configuration conf = new Configuration();
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    try {
+      cluster.waitActive();
+      final FSNamesystem namesystem = cluster.getNamesystem();
+      final DatanodeRegistration nodeReg = cluster.getDataNodes().get(0).dnRegistration;
+      DatanodeDescriptor dd = namesystem.getDatanode(nodeReg);
+      
+      final int REMAINING_BLOCKS = 1;
+      final int MAX_REPLICATE_LIMIT = conf.getInt("dfs.max-repl-streams", 2);
+      final int MAX_INVALIDATE_LIMIT = FSNamesystem.BLOCK_INVALIDATE_CHUNK;
+      final int MAX_INVALIDATE_BLOCKS = 2*MAX_INVALIDATE_LIMIT+REMAINING_BLOCKS;
+      final int MAX_REPLICATE_BLOCKS = 2*MAX_REPLICATE_LIMIT+REMAINING_BLOCKS;
+      final DatanodeDescriptor[] ONE_TARGET = new DatanodeDescriptor[1];
+
+      synchronized (namesystem.heartbeats) {
+      for (int i=0; i<MAX_REPLICATE_BLOCKS; i++) {
+        dd.addBlockToBeReplicated(
+            new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP), ONE_TARGET);
+      }
+      DatanodeCommand[] cmds = namesystem.handleHeartbeat(
+          nodeReg, dd.getCapacity(), dd.getDfsUsed(), dd.getRemaining(), 0, 0);
+      assertEquals(1, cmds.length);
+      assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
+      assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length);
+      
+      ArrayList<Block> blockList = new ArrayList<Block>(MAX_INVALIDATE_BLOCKS);
+      for (int i=0; i<MAX_INVALIDATE_BLOCKS; i++) {
+        blockList.add(new Block(i, 0, GenerationStamp.FIRST_VALID_STAMP));
+      }
+      dd.addBlocksToBeInvalidated(blockList);
+           
+      cmds = namesystem.handleHeartbeat(
+          nodeReg, dd.getCapacity(), dd.getDfsUsed(), dd.getRemaining(), 0, 0);
+      assertEquals(2, cmds.length);
+      assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
+      assertEquals(MAX_REPLICATE_LIMIT, ((BlockCommand)cmds[0]).getBlocks().length);
+      assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[1].getAction());
+      assertEquals(MAX_INVALIDATE_LIMIT, ((BlockCommand)cmds[1]).getBlocks().length);
+      
+      cmds = namesystem.handleHeartbeat(
+          nodeReg, dd.getCapacity(), dd.getDfsUsed(), dd.getRemaining(), 0, 0);
+      assertEquals(2, cmds.length);
+      assertEquals(DatanodeProtocol.DNA_TRANSFER, cmds[0].getAction());
+      assertEquals(REMAINING_BLOCKS, ((BlockCommand)cmds[0]).getBlocks().length);
+      assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[1].getAction());
+      assertEquals(MAX_INVALIDATE_LIMIT, ((BlockCommand)cmds[1]).getBlocks().length);
+      
+      cmds = namesystem.handleHeartbeat(
+          nodeReg, dd.getCapacity(), dd.getDfsUsed(), dd.getRemaining(), 0, 0);
+      assertEquals(1, cmds.length);
+      assertEquals(DatanodeProtocol.DNA_INVALIDATE, cmds[0].getAction());
+      assertEquals(REMAINING_BLOCKS, ((BlockCommand)cmds[0]).getBlocks().length);
+
+      cmds = namesystem.handleHeartbeat(
+          nodeReg, dd.getCapacity(), dd.getDfsUsed(), dd.getRemaining(), 0, 0);
+      assertEquals(null, cmds);
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}



Mime
View raw message