hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r468115 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/
Date Thu, 26 Oct 2006 20:22:37 GMT
Author: cutting
Date: Thu Oct 26 13:22:36 2006
New Revision: 468115

URL: http://svn.apache.org/viewvc?view=rev&rev=468115
Log:
HADOOP-641.  Change NameNoide to request a fresh block report from re-discovered DataNodes.
 Contributed by Konstantin.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=468115&r1=468114&r2=468115
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu Oct 26 13:22:36 2006
@@ -62,6 +62,10 @@
     as the source for file copy and move commands.
     (Dhruba Borthakur via cutting)
 
+17. HADOOP-641.  Change NameNode to request a fresh block report from
+    a re-discovered DataNode, so that no-longer-needed replications
+    are stopped promptly.  (Konstantin Shvachko via cutting)
+
 
 Release 0.7.2 - 2006-10-18
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java?view=diff&rev=468115&r1=468114&r2=468115
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java Thu Oct 26 13:22:36
2006
@@ -37,54 +37,43 @@
            public Writable newInstance() { return new BlockCommand(); }
          });
     }
-  
-    boolean transferBlocks = false;
-    boolean invalidateBlocks = false;
-    boolean shutdown = false;
+
+    DatanodeProtocol.DataNodeAction action;
     Block blocks[];
     DatanodeInfo targets[][];
 
     public BlockCommand() {
-        this.transferBlocks = false;
-        this.invalidateBlocks = false;
-        this.shutdown = false;
-        this.blocks = new Block[0];
-        this.targets = new DatanodeInfo[0][];
+      this.action = DatanodeProtocol.DataNodeAction.DNA_UNKNOWN;
+      this.blocks = new Block[0];
+      this.targets = new DatanodeInfo[0][];
     }
 
+    /**
+     * Create BlockCommand for transferring blocks to another datanode
+     * @param blocks    blocks to be transferred 
+     * @param targets   nodes to transfer
+     */
     public BlockCommand(Block blocks[], DatanodeInfo targets[][]) {
-        this.transferBlocks = true;
-        this.invalidateBlocks = false;
-        this.shutdown = false;
-        this.blocks = blocks;
-        this.targets = targets;
+      this.action = DatanodeProtocol.DataNodeAction.DNA_TRANSFER;
+      this.blocks = blocks;
+      this.targets = targets;
     }
 
+    /**
+     * Create BlockCommand for block invalidation
+     * @param blocks  blocks to invalidate
+     */
     public BlockCommand(Block blocks[]) {
-        this.transferBlocks = false;
-        this.invalidateBlocks = true;
-        this.shutdown = false;
-        this.blocks = blocks;
-        this.targets = new DatanodeInfo[0][];
+      this.action = DatanodeProtocol.DataNodeAction.DNA_INVALIDATE;
+      this.blocks = blocks;
+      this.targets = new DatanodeInfo[0][];
     }
 
-    public BlockCommand( boolean doShutdown ) {
+    public BlockCommand( DatanodeProtocol.DataNodeAction action ) {
       this();
-      this.shutdown = doShutdown;
-    }
-
-    public boolean transferBlocks() {
-        return transferBlocks;
+      this.action = action;
     }
 
-    public boolean invalidateBlocks() {
-        return invalidateBlocks;
-    }
-    
-    public boolean shutdownNode() {
-      return shutdown;
-  }
-  
     public Block[] getBlocks() {
         return blocks;
     }
@@ -97,8 +86,7 @@
     // Writable
     ///////////////////////////////////////////
     public void write(DataOutput out) throws IOException {
-        out.writeBoolean(transferBlocks);
-        out.writeBoolean(invalidateBlocks);        
+        WritableUtils.writeEnum( out, action );
         out.writeInt(blocks.length);
         for (int i = 0; i < blocks.length; i++) {
             blocks[i].write(out);
@@ -113,8 +101,8 @@
     }
 
     public void readFields(DataInput in) throws IOException {
-        this.transferBlocks = in.readBoolean();
-        this.invalidateBlocks = in.readBoolean();
+        this.action = (DatanodeProtocol.DataNodeAction)
+            WritableUtils.readEnum( in, DatanodeProtocol.DataNodeAction.class );
         this.blocks = new Block[in.readInt()];
         for (int i = 0; i < blocks.length; i++) {
             blocks[i] = new Block();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=468115&r1=468114&r2=468115
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Thu Oct 26 13:22:36 2006
@@ -267,7 +267,17 @@
      * @throws IOException
      */
     private void register() throws IOException {
-      dnRegistration = namenode.register( dnRegistration );
+      while( true ) {
+        try {
+          dnRegistration = namenode.register( dnRegistration );
+          break;
+        } catch( SocketTimeoutException e ) {  // namenode is busy
+          LOG.info("Problem connecting to server: " + getNameNodeAddr());
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ie) {}
+        }
+      }
       if( storage.getStorageID().equals("") ) {
         storage.setStorageID( dnRegistration.getStorageID());
         storage.writeAll();
@@ -350,29 +360,14 @@
 
             if( cmd != null ) {
               data.checkDataDir();
-              if (cmd.transferBlocks()) {
+              switch( cmd.action ) {
+              case DNA_TRANSFER:
                 //
                 // Send a copy of a block to another datanode
                 //
-                Block blocks[] = cmd.getBlocks();
-                DatanodeInfo xferTargets[][] = cmd.getTargets();
-                    
-                for (int i = 0; i < blocks.length; i++) {
-                  if (!data.isValidBlock(blocks[i])) {
-                    String errStr = "Can't send invalid block " + blocks[i];
-                    LOG.info(errStr);
-                    namenode.errorReport( dnRegistration, 
-                                DatanodeProtocol.INVALID_BLOCK, 
-                                errStr);
-                    break;
-                  } else {
-                    if (xferTargets[i].length > 0) {
-                        LOG.info("Starting thread to transfer block " + blocks[i] + " to
" + xferTargets[i]);
-                        new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
-                    }
-                  }
-                }
-              } else if (cmd.invalidateBlocks()) {
+                transferBlocks( cmd.getBlocks(), cmd.getTargets() );
+                break;
+              case DNA_INVALIDATE:
                 //
                 // Some local block(s) are obsolete and can be 
                 // safely garbage-collected.
@@ -380,10 +375,17 @@
                 Block toDelete[] = cmd.getBlocks();
                 data.invalidate(toDelete);
                 myMetrics.removedBlocks(toDelete.length);
-              } else if( cmd.shutdownNode()) {
+                break;
+              case DNA_SHUTDOWN:
                 // shut down the data node
                 this.shutdown();
                 continue;
+              case DNA_REPORT:
+                // namenode requested a block report; sending
+                lastBlockReport = 0;
+                break;
+              default:
+                LOG.warn( "Unknown BlockCommand action: " + cmd.action);
               }
             }
           }
@@ -455,7 +457,24 @@
       } // while (shouldRun)
     } // offerService
 
-    
+    private void transferBlocks(  Block blocks[], 
+                                  DatanodeInfo xferTargets[][] 
+                                ) throws IOException {
+      for (int i = 0; i < blocks.length; i++) {
+        if (!data.isValidBlock(blocks[i])) {
+          String errStr = "Can't send invalid block " + blocks[i];
+          LOG.info(errStr);
+          namenode.errorReport( dnRegistration, 
+                                DatanodeProtocol.INVALID_BLOCK, 
+                                errStr );
+          break;
+        }
+        if (xferTargets[i].length > 0) {
+          LOG.info("Starting thread to transfer block " + blocks[i] + " to " + xferTargets[i]);
+          new Daemon(new DataTransfer(xferTargets[i], blocks[i])).start();
+        }
+      }
+    }
     
     /**
      * Server used for receiving/sending a block of data.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?view=diff&rev=468115&r1=468114&r2=468115
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Thu Oct 26 13:22:36
2006
@@ -29,12 +29,23 @@
  * @author Michael Cafarella
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
-  public static final long versionID = 2L;  // infoPort added to DatanodeID
-                                            // affected: DatanodeRegistration
+  public static final long versionID = 3L;  // BlockCommand.action replaced boolean members
+                                            // affected: BlockCommand
   
   // error code
   final static int DISK_ERROR = 1;
   final static int INVALID_BLOCK = 2;
+
+  /**
+   * Determines actions that data node should perform 
+   * when receiving a block command. 
+   */
+  public enum DataNodeAction{ DNA_UNKNOWN,    // unknown action   
+                              DNA_TRANSFER,   // transfer blocks to another datanode
+                              DNA_INVALIDATE, // invalidate blocks
+                              DNA_SHUTDOWN,   // shutdown node
+                              DNA_REPORT; }   // send block report to the namenode
+
   /** 
    * Register Datanode.
    *

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=468115&r1=468114&r2=468115
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Thu Oct 26 13:22:36
2006
@@ -1297,14 +1297,23 @@
      * The given node has reported in.  This method should:
      * 1) Record the heartbeat, so the datanode isn't timed out
      * 2) Adjust usage stats for future block allocation
+     * 
+     * If a substantial amount of time passed since the last datanode 
+     * heartbeat then request an immediate block report.  
+     * 
+     * @return true if block report is required or false otherwise.
+     * @throws IOException
      */
-    public synchronized void gotHeartbeat(DatanodeID nodeID,
-                                          long capacity, 
-                                          long remaining,
-                                          int xceiverCount) throws IOException {
+    public synchronized boolean gotHeartbeat( DatanodeID nodeID,
+                                              long capacity, 
+                                              long remaining,
+                                              int xceiverCount
+                                            ) throws IOException {
+      boolean needBlockReport;
       synchronized (heartbeats) {
         synchronized (datanodeMap) {
           DatanodeDescriptor nodeinfo = getDatanode( nodeID );
+          needBlockReport = nodeinfo.isDead(); 
           
           if (nodeinfo == null) 
             // We do not accept unregistered guests
@@ -1314,6 +1323,7 @@
           addHeartbeat(nodeinfo);
         }
       }
+      return needBlockReport;
     }
 
     /**
@@ -1902,6 +1912,9 @@
                       "BLOCK* NameSystem.pendingTransfer: " + "ask "
                       + srcNode.getName() + " to replicate "
                       + block.getBlockName() + " to " + targetList);
+              NameNode.stateChangeLog.debug(
+                  "BLOCK* neededReplications = " + neededReplications.size()
+                  + " pendingReplications = " + pendingReplications.size() );
             }
           }
 
@@ -2280,7 +2293,7 @@
        */
       synchronized boolean isOn() {
         try {
-          isConsistent();   // SHV this an assert
+          isConsistent();   // SHV this is an assert
         } catch( IOException e ) {
           System.err.print( StringUtils.stringifyException( e ));
         }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=468115&r1=468114&r2=468115
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Thu Oct 26 13:22:36 2006
@@ -464,7 +464,10 @@
                                       int xmitsInProgress,
                                       int xceiverCount) throws IOException {
         verifyRequest( nodeReg );
-        namesystem.gotHeartbeat( nodeReg, capacity, remaining, xceiverCount );
+        if( namesystem.gotHeartbeat( nodeReg, capacity, remaining, xceiverCount )) {
+          // request block report from the datanode
+          return new BlockCommand( DataNodeAction.DNA_REPORT );
+        }
         
         //
         // Ask to perform pending transfers, if any



Mime
View raw message