hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject svn commit: r885143 [9/18] - in /hadoop/hdfs/branches/HDFS-326: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/ant/org/apache/hadoop/ant/ src/ant/org/apache/hadoop/ant/condition/ src/c++/ src/c++/libhdfs/ src/c++/libhdfs/docs/...
Date Sat, 28 Nov 2009 20:06:08 GMT
Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
(original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
Sat Nov 28 20:05:56 2009
@@ -29,11 +29,11 @@
 import java.io.PrintStream;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
 import java.util.TreeSet;
 import java.util.regex.Matcher;
@@ -49,6 +49,7 @@
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -211,8 +212,8 @@
   private void init() {
     
     // get the list of blocks and arrange them in random order
-    Block arr[] = dataset.getBlockReport();
-    Collections.shuffle(Arrays.asList(arr));
+    List<Block> arr = dataset.getFinalizedBlocks();
+    Collections.shuffle(arr);
     
     blockInfoSet = new TreeSet<BlockScanInfo>();
     blockMap = new HashMap<Block, BlockScanInfo>();
@@ -373,7 +374,7 @@
   static private class LogEntry {
     long blockId = -1;
     long verificationTime = -1;
-    long genStamp = Block.GRANDFATHER_GENERATION_STAMP;
+    long genStamp = GenerationStamp.GRANDFATHER_GENERATION_STAMP;
     
     /**
      * The format consists of single line with multiple entries. each 
@@ -450,7 +451,6 @@
         return;
       } catch (IOException e) {
 
-        totalScanErrors++;
         updateScanStatus(block, ScanType.VERIFICATION_SCAN, false);
 
         // If the block does not exists anymore, then its not an error
@@ -466,6 +466,7 @@
                  StringUtils.stringifyException(e));
         
         if (second) {
+          totalScanErrors++;
           datanode.getMetrics().blockVerificationFailures.inc(); 
           handleScanFailure(block);
           return;

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Sat Nov 28 20:05:56 2009
@@ -34,11 +34,10 @@
 import java.util.AbstractList;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.EnumSet;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -55,11 +54,16 @@
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.AccessTokenHandler;
+import org.apache.hadoop.hdfs.security.ExportedAccessKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -67,7 +71,7 @@
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
-import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -75,7 +79,11 @@
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ipc.RPC;
@@ -83,9 +91,6 @@
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.AccessTokenHandler;
-import org.apache.hadoop.security.ExportedAccessKeys;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.authorize.ConfiguredPolicy;
 import org.apache.hadoop.security.authorize.PolicyProvider;
@@ -169,8 +174,6 @@
 
   volatile boolean shouldRun = true;
   private LinkedList<Block> receivedBlockList = new LinkedList<Block>();
-  /** list of blocks being recovered */
-  private final Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();
   private LinkedList<String> delHints = new LinkedList<String>();
   public final static String EMPTY_DEL_HINT = "";
   AtomicInteger xmitsInProgress = new AtomicInteger();
@@ -268,8 +271,8 @@
                      AbstractList<File> dataDirs
                      ) throws IOException, InterruptedException {
     // use configured nameserver & interface to get local hostname
-    if (conf.get("slave.host.name") != null) {
-      machineName = conf.get("slave.host.name");   
+    if (conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY) != null) {
+      machineName = conf.get(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY);   
     }
     if (machineName == null) {
       machineName = DNS.getDefaultHost(
@@ -278,7 +281,7 @@
     }
     this.nameNodeAddr = NameNode.getAddress(conf);
     
-    this.socketTimeout =  conf.getInt("dfs.socket.timeout",
+    this.socketTimeout =  conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
                                       HdfsConstants.READ_TIMEOUT);
     this.socketWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
                                           HdfsConstants.WRITE_TIMEOUT);
@@ -286,7 +289,8 @@
      * to false on some of them. */
     this.transferToAllowed = conf.getBoolean("dfs.datanode.transferTo.allowed", 
                                              true);
-    this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
+    this.writePacketSize = conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
+                                       DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
     InetSocketAddress socAddr = NetUtils.createSocketAddr(
         conf.get("dfs.datanode.address", "0.0.0.0:50010"));
     int tmpPort = socAddr.getPort();
@@ -313,7 +317,7 @@
         dnRegistration.storageInfo.namespaceID = nsInfo.namespaceID;
         // it would have been better to pass storage as a parameter to
         // constructor below - need to augment ReflectionUtils used below.
-        conf.set("StorageId", dnRegistration.getStorageID());
+        conf.set(DFSConfigKeys.DFS_DATANODE_STORAGEID_KEY, dnRegistration.getStorageID());
         try {
           //Equivalent of following (can't do because Simulated is in test dir)
           //  this.data = new SimulatedFSDataset(conf);
@@ -385,10 +389,11 @@
       LOG.debug("Datanode listening on " + infoHost + ":" + tmpInfoPort);
     }
     if (conf.getBoolean("dfs.https.enable", false)) {
-      boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
+      boolean needClientAuth = conf.getBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
+                                               DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
       InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
           "dfs.datanode.https.address", infoHost + ":" + 0));
-      Configuration sslConf = new Configuration(false);
+      Configuration sslConf = new HdfsConfiguration(false);
       sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
           "ssl-server.xml"));
       if (LOG.isDebugEnabled()) {
@@ -405,7 +410,7 @@
     this.infoServer.start();
     // adjust info port
     this.dnRegistration.setInfoPort(this.infoServer.getPort());
-    myMetrics = new DataNodeMetrics(conf, dnRegistration.getStorageID());
+    myMetrics = new DataNodeMetrics(conf, dnRegistration.getName());
     
     // set service-level authorization security policy
     if (conf.getBoolean(
@@ -963,7 +968,7 @@
       processDistributedUpgradeCommand((UpgradeCommand)cmd);
       break;
     case DatanodeProtocol.DNA_RECOVERBLOCK:
-      recoverBlocks(bcmd.getBlocks(), bcmd.getTargets());
+      recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
       break;
     case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
       LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");
@@ -1040,13 +1045,12 @@
       // and can be safely GC'ed.
       //
       long brStartTime = now();
-      Block[] bReport = data.getBlockReport();
+      BlockListAsLongs bReport = data.getBlockReport();
 
-      cmd = namenode.blockReport(dnRegistration,
-              BlockListAsLongs.convertToArrayLongs(bReport));
+      cmd = namenode.blockReport(dnRegistration, bReport.getBlockListAsLongs());
       long brTime = now() - brStartTime;
       myMetrics.blockReports.inc(brTime);
-      LOG.info("BlockReport of " + bReport.length +
+      LOG.info("BlockReport of " + bReport.getNumberOfBlocks() +
           " blocks got processed in " + brTime + " msecs");
       //
       // If we have sent the first block report, then wait a random
@@ -1295,13 +1299,14 @@
         //
         // Header info
         //
-        AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+        BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
         if (isAccessTokenEnabled) {
           accessToken = accessTokenHandler.generateToken(null, b.getBlockId(),
               EnumSet.of(AccessTokenHandler.AccessMode.WRITE));
         }
         DataTransferProtocol.Sender.opWriteBlock(out,
-            b.getBlockId(), b.getGenerationStamp(), 0, false, "",
+            b.getBlockId(), b.getGenerationStamp(), 0, 
+            BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0, 0, "",
             srcNode, targets, accessToken);
 
         // send data & checksum
@@ -1324,6 +1329,20 @@
       }
     }
   }
+  
+  /**
+   * After a block becomes finalized, a datanode increases metric counter,
+   * notifies namenode, and adds it to the block scanner
+   * @param block
+   * @param delHint
+   */
+  void closeBlock(Block block, String delHint) {
+    myMetrics.blocksWritten.inc();
+    notifyNamenodeReceivedBlock(block, delHint);
+    if (blockScanner != null) {
+      blockScanner.addBlock(block);
+    }
+  }
 
   /**
    * No matter what kind of exception we get, keep retrying to offerService().
@@ -1382,7 +1401,7 @@
   public static DataNode instantiateDataNode(String args[],
                                       Configuration conf) throws IOException {
     if (conf == null)
-      conf = new Configuration();
+      conf = new HdfsConfiguration();
     
     if (args != null) {
       // parse generic hadoop options
@@ -1399,7 +1418,7 @@
           " anymore. RackID resolution is handled by the NameNode.");
       System.exit(-1);
     }
-    String[] dataDirs = conf.getStrings("dfs.data.dir");
+    String[] dataDirs = conf.getStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
     dnThreadName = "DataNode: [" +
                         StringUtils.arrayToString(dataDirs) + "]";
     return makeInstance(dataDirs, conf);
@@ -1561,42 +1580,16 @@
     }
   }
 
-  // InterDataNodeProtocol implementation
-  /** {@inheritDoc} */
-  public BlockMetaDataInfo getBlockMetaDataInfo(Block block
-      ) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("block=" + block);
-    }
-    Block stored = data.getStoredBlock(block.getBlockId());
-
-    if (stored == null) {
-      return null;
-    }
-    BlockMetaDataInfo info = new BlockMetaDataInfo(stored,
-                                 blockScanner.getLastScanTime(stored));
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("getBlockMetaDataInfo successful block=" + stored +
-                " length " + stored.getNumBytes() +
-                " genstamp " + stored.getGenerationStamp());
-    }
-
-    // paranoia! verify that the contents of the stored block
-    // matches the block file on disk.
-    data.validateBlockMetadata(stored);
-    return info;
-  }
-
-  public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
+  public Daemon recoverBlocks(final Collection<RecoveringBlock> blocks) {
     Daemon d = new Daemon(threadGroup, new Runnable() {
       /** Recover a list of blocks. It is run by the primary datanode. */
       public void run() {
-        for(int i = 0; i < blocks.length; i++) {
+        for(RecoveringBlock b : blocks) {
           try {
-            logRecoverBlock("NameNode", blocks[i], targets[i]);
-            recoverBlock(blocks[i], false, targets[i], true);
+            logRecoverBlock("NameNode", b.getBlock(), b.getLocations());
+            recoverBlock(b);
           } catch (IOException e) {
-            LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
+            LOG.warn("recoverBlocks FAILED: " + b, e);
           }
         }
       }
@@ -1605,22 +1598,39 @@
     return d;
   }
 
-  /** {@inheritDoc} */
-  public void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException
{
-    LOG.info("oldblock=" + oldblock + "(length=" + oldblock.getNumBytes()
-        + "), newblock=" + newblock + "(length=" + newblock.getNumBytes()
-        + "), datanode=" + dnRegistration.getName());
-    data.updateBlock(oldblock, newblock);
-    if (finalize) {
-      data.finalizeBlock(newblock);
-      myMetrics.blocksWritten.inc(); 
-      notifyNamenodeReceivedBlock(newblock, EMPTY_DEL_HINT);
-      LOG.info("Received block " + newblock +
-                " of size " + newblock.getNumBytes() +
-                " as part of lease recovery.");
+  // InterDataNodeProtocol implementation
+  @Override // InterDatanodeProtocol
+  public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
+  throws IOException {
+    return data.initReplicaRecovery(rBlock);
+  }
+
+  /**
+   * Convenience method, which unwraps RemoteException.
+   * @throws IOException not a RemoteException.
+   */
+  private static ReplicaRecoveryInfo callInitReplicaRecovery(
+      InterDatanodeProtocol datanode,
+      RecoveringBlock rBlock) throws IOException {
+    try {
+      return datanode.initReplicaRecovery(rBlock);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException();
     }
   }
 
+  /**
+   * Update replica with the new generation stamp and length.  
+   */
+  @Override // InterDatanodeProtocol
+  public Block updateReplicaUnderRecovery(Block oldBlock,
+                                          long recoveryId,
+                                          long newLength) throws IOException {
+    ReplicaInfo r =
+      data.updateReplicaUnderRecovery(oldBlock, recoveryId, newLength);
+    return new Block(r);
+  }
+
   /** {@inheritDoc} */
   public long getProtocolVersion(String protocol, long clientVersion
       ) throws IOException {
@@ -1633,164 +1643,171 @@
         + ": " + protocol);
   }
 
-  /** A convenient class used in lease recovery */
+  /** A convenient class used in block recovery */
   private static class BlockRecord { 
     final DatanodeID id;
     final InterDatanodeProtocol datanode;
-    final Block block;
+    final ReplicaRecoveryInfo rInfo;
     
-    BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
+    BlockRecord(DatanodeID id,
+                InterDatanodeProtocol datanode,
+                ReplicaRecoveryInfo rInfo) {
       this.id = id;
       this.datanode = datanode;
-      this.block = block;
+      this.rInfo = rInfo;
     }
 
     /** {@inheritDoc} */
     public String toString() {
-      return "block:" + block + " node:" + id;
+      return "block:" + rInfo + " node:" + id;
     }
   }
 
   /** Recover a block */
-  private LocatedBlock recoverBlock(Block block, boolean keepLength,
-      DatanodeInfo[] targets, boolean closeFile) throws IOException {
-
+  private void recoverBlock(RecoveringBlock rBlock) throws IOException {
+    Block block = rBlock.getBlock();
+    DatanodeInfo[] targets = rBlock.getLocations();
     DatanodeID[] datanodeids = (DatanodeID[])targets;
-    // If the block is already being recovered, then skip recovering it.
-    // This can happen if the namenode and client start recovering the same
-    // file at the same time.
-    synchronized (ongoingRecovery) {
-      Block tmp = new Block();
-      tmp.set(block.getBlockId(), block.getNumBytes(), GenerationStamp.WILDCARD_STAMP);
-      if (ongoingRecovery.get(tmp) != null) {
-        String msg = "Block " + block + " is already being recovered, " +
-                     " ignoring this request to recover it.";
-        LOG.info(msg);
-        throw new IOException(msg);
-      }
-      ongoingRecovery.put(block, block);
-    }
-    try {
-      List<BlockRecord> syncList = new ArrayList<BlockRecord>();
-      long minlength = Long.MAX_VALUE;
-      int errorCount = 0;
+    List<BlockRecord> syncList = new ArrayList<BlockRecord>(datanodeids.length);
+    int errorCount = 0;
 
-      //check generation stamps
-      for(DatanodeID id : datanodeids) {
-        try {
-          InterDatanodeProtocol datanode = dnRegistration.equals(id)?
-              this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
-          BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
-          if (info != null && info.getGenerationStamp() >= block.getGenerationStamp())
{
-            if (keepLength) {
-              if (info.getNumBytes() == block.getNumBytes()) {
-                syncList.add(new BlockRecord(id, datanode, new Block(info)));
-              }
-            }
-            else {
-              syncList.add(new BlockRecord(id, datanode, new Block(info)));
-              if (info.getNumBytes() < minlength) {
-                minlength = info.getNumBytes();
-              }
-            }
-          }
-        } catch (IOException e) {
-          ++errorCount;
-          InterDatanodeProtocol.LOG.warn(
-              "Failed to getBlockMetaDataInfo for block (=" + block 
-              + ") from datanode (=" + id + ")", e);
+    //check generation stamps
+    for(DatanodeID id : datanodeids) {
+      try {
+        InterDatanodeProtocol datanode = dnRegistration.equals(id)?
+            this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
+        ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
+        if (info != null &&
+            info.getGenerationStamp() >= block.getGenerationStamp() &&
+            info.getNumBytes() > 0) {
+          syncList.add(new BlockRecord(id, datanode, info));
         }
+      } catch (RecoveryInProgressException ripE) {
+        InterDatanodeProtocol.LOG.warn(
+            "Recovery for replica " + block + " on data-node " + id
+            + " is already in progress. Recovery id = "
+            + rBlock.getNewGenerationStamp() + " is aborted.", ripE);
+        return;
+      } catch (IOException e) {
+        ++errorCount;
+        InterDatanodeProtocol.LOG.warn(
+            "Failed to obtain replica info for block (=" + block 
+            + ") from datanode (=" + id + ")", e);
       }
+    }
 
-      if (syncList.isEmpty() && errorCount > 0) {
-        throw new IOException("All datanodes failed: block=" + block
-            + ", datanodeids=" + Arrays.asList(datanodeids));
-      }
-      if (!keepLength) {
-        block.setNumBytes(minlength);
-      }
-      return syncBlock(block, syncList, targets, closeFile);
-    } finally {
-      synchronized (ongoingRecovery) {
-        ongoingRecovery.remove(block);
-      }
+    if (errorCount == datanodeids.length) {
+      throw new IOException("All datanodes failed: block=" + block
+          + ", datanodeids=" + Arrays.asList(datanodeids));
     }
+
+    syncBlock(rBlock, syncList);
   }
 
   /** Block synchronization */
-  private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
-      DatanodeInfo[] targets, boolean closeFile) throws IOException {
+  private void syncBlock(RecoveringBlock rBlock,
+                         List<BlockRecord> syncList) throws IOException {
+    Block block = rBlock.getBlock();
+    long recoveryId = rBlock.getNewGenerationStamp();
     if (LOG.isDebugEnabled()) {
       LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
-          + "), syncList=" + syncList + ", closeFile=" + closeFile);
+          + "), syncList=" + syncList);
     }
 
-    //syncList.isEmpty() that all datanodes do not have the block
-    //so the block can be deleted.
+    // syncList.isEmpty() means that all data-nodes do not have the block
+    // or their replicas have 0 length.
+    // The block can be deleted.
     if (syncList.isEmpty()) {
-      namenode.commitBlockSynchronization(block, 0, 0, closeFile, true,
-          DatanodeID.EMPTY_ARRAY);
-      //always return a new access token even if everything else stays the same
-      LocatedBlock b = new LocatedBlock(block, targets);
-      if (isAccessTokenEnabled) {
-        b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
-            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
-      }
-      return b;
+      namenode.commitBlockSynchronization(block, recoveryId, 0,
+          true, true, DatanodeID.EMPTY_ARRAY);
+      return;
     }
 
-    List<DatanodeID> successList = new ArrayList<DatanodeID>();
+    // Calculate the best available replica state.
+    ReplicaState bestState = ReplicaState.RWR;
+    long finalizedLength = -1;
+    for(BlockRecord r : syncList) {
+      assert r.rInfo.getNumBytes() > 0 : "zero length replica";
+      ReplicaState rState = r.rInfo.getOriginalReplicaState(); 
+      if(rState.getValue() < bestState.getValue())
+        bestState = rState;
+      if(rState == ReplicaState.FINALIZED) {
+        if(finalizedLength > 0 && finalizedLength != r.rInfo.getNumBytes())
+          throw new IOException("Inconsistent size of finalized replicas. " +
+              "Replica " + r.rInfo + " expected size: " + finalizedLength);
+        finalizedLength = r.rInfo.getNumBytes();
+      }
+    }
 
-    long generationstamp = namenode.nextGenerationStamp(block);
-    Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp);
+    // Calculate list of nodes that will participate in the recovery
+    // and the new block size
+    List<BlockRecord> participatingList = new ArrayList<BlockRecord>();
+    Block newBlock = new Block(block.getBlockId(), -1, recoveryId);
+    switch(bestState) {
+    case FINALIZED:
+      assert finalizedLength > 0 : "finalizedLength is not positive";
+      for(BlockRecord r : syncList) {
+        ReplicaState rState = r.rInfo.getOriginalReplicaState();
+        if(rState == ReplicaState.FINALIZED ||
+           rState == ReplicaState.RBW &&
+                      r.rInfo.getNumBytes() == finalizedLength)
+          participatingList.add(r);
+      }
+      newBlock.setNumBytes(finalizedLength);
+      break;
+    case RBW:
+    case RWR:
+      long minLength = Long.MAX_VALUE;
+      for(BlockRecord r : syncList) {
+        ReplicaState rState = r.rInfo.getOriginalReplicaState();
+        if(rState == bestState) {
+          minLength = Math.min(minLength, r.rInfo.getNumBytes());
+          participatingList.add(r);
+        }
+      }
+      newBlock.setNumBytes(minLength);
+      break;
+    case RUR:
+    case TEMPORARY:
+      assert false : "bad replica state: " + bestState;
+    }
 
-    for(BlockRecord r : syncList) {
+    List<DatanodeID> failedList = new ArrayList<DatanodeID>();
+    List<DatanodeID> successList = new ArrayList<DatanodeID>();
+    for(BlockRecord r : participatingList) {
       try {
-        r.datanode.updateBlock(r.block, newblock, closeFile);
+        Block reply = r.datanode.updateReplicaUnderRecovery(
+            r.rInfo, recoveryId, newBlock.getNumBytes());
+        assert reply.equals(newBlock) &&
+               reply.getNumBytes() == newBlock.getNumBytes() :
+          "Updated replica must be the same as the new block.";
         successList.add(r.id);
       } catch (IOException e) {
         InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
-            + newblock + ", datanode=" + r.id + ")", e);
+            + newBlock + ", datanode=" + r.id + ")", e);
+        failedList.add(r.id);
       }
     }
 
-    if (!successList.isEmpty()) {
-      DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
-
-      namenode.commitBlockSynchronization(block,
-          newblock.getGenerationStamp(), newblock.getNumBytes(), closeFile, false,
-          nlist);
-      DatanodeInfo[] info = new DatanodeInfo[nlist.length];
-      for (int i = 0; i < nlist.length; i++) {
-        info[i] = new DatanodeInfo(nlist[i]);
+    // If any of the data-nodes failed, the recovery fails, because
+    // we never know the actual state of the replica on failed data-nodes.
+    // The recovery should be started over.
+    if(!failedList.isEmpty()) {
+      StringBuilder b = new StringBuilder();
+      for(DatanodeID id : failedList) {
+        b.append("\n  " + id);
       }
-      LocatedBlock b = new LocatedBlock(newblock, info); // success
-      // should have used client ID to generate access token, but since 
-      // owner ID is not checked, we simply pass null for now.
-      if (isAccessTokenEnabled) {
-        b.setAccessToken(accessTokenHandler.generateToken(null, b.getBlock()
-            .getBlockId(), EnumSet.of(AccessTokenHandler.AccessMode.WRITE)));
-      }
-      return b;
+      throw new IOException("Cannot recover " + block + ", the following "
+          + failedList.size() + " data-nodes failed {" + b + "\n}");
     }
 
-    //failed
-    StringBuilder b = new StringBuilder();
-    for(BlockRecord r : syncList) {
-      b.append("\n  " + r.id);
-    }
-    throw new IOException("Cannot recover " + block + ", none of these "
-        + syncList.size() + " datanodes success {" + b + "\n}");
+    // Notify the name-node about successfully recovered replicas.
+    DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
+    namenode.commitBlockSynchronization(block,
+        newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
+        nlist);
   }
   
-  // ClientDataNodeProtocol implementation
-  /** {@inheritDoc} */
-  public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
-      ) throws IOException {
-    logRecoverBlock("Client", block, targets);
-    return recoverBlock(block, keepLength, targets, false);
-  }
-
   private static void logRecoverBlock(String who,
       Block block, DatanodeID[] targets) {
     StringBuilder msg = new StringBuilder(targets[0].getName());
@@ -1800,4 +1817,11 @@
     LOG.info(who + " calls recoverBlock(block=" + block
         + ", targets=[" + msg + "])");
   }
+
+  // ClientDataNodeProtocol implementation
+  /** {@inheritDoc} */
+  @Override // ClientDataNodeProtocol
+  public long getReplicaVisibleLength(final Block block) throws IOException {
+    return data.getReplicaVisibleLength(block);
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
(original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
Sat Nov 28 20:05:56 2009
@@ -32,8 +32,8 @@
 import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.FileUtil.HardLink;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
@@ -53,6 +53,9 @@
   final static String BLOCK_SUBDIR_PREFIX = "subdir";
   final static String BLOCK_FILE_PREFIX = "blk_";
   final static String COPY_FILE_PREFIX = "dncp_";
+  final static String STORAGE_DIR_RBW = "rbw";
+  final static String STORAGE_DIR_FINALIZED = "finalized";
+  final static String STORAGE_DIR_DETACHED = "detach";
   
   private String storageID;
 
@@ -270,6 +273,8 @@
     File curDir = sd.getCurrentDir();
     File prevDir = sd.getPreviousDir();
     assert curDir.exists() : "Current directory must exist.";
+    // Cleanup directory "detach"
+    cleanupDetachDir(new File(curDir, STORAGE_DIR_DETACHED));
     // delete previous dir before upgrading
     if (prevDir.exists())
       deleteDir(prevDir);
@@ -277,8 +282,11 @@
     assert !tmpDir.exists() : "previous.tmp directory must not exist.";
     // rename current to tmp
     rename(curDir, tmpDir);
-    // hardlink blocks
-    linkBlocks(tmpDir, curDir, this.getLayoutVersion());
+    // hard link finalized & rbw blocks
+    linkAllBlocks(tmpDir, curDir);
+    // create current directory if not exists
+    if (!curDir.exists() && !curDir.mkdirs())
+      throw new IOException("Cannot create directory " + curDir);
     // write version file
     this.layoutVersion = FSConstants.LAYOUT_VERSION;
     assert this.namespaceID == nsInfo.getNamespaceID() :
@@ -290,6 +298,30 @@
     LOG.info("Upgrade of " + sd.getRoot()+ " is complete.");
   }
 
+  /**
+   * Cleanup the detachDir. 
+   * 
+   * If the directory is not empty report an error; 
+   * Otherwise remove the directory.
+   * 
+   * @param detachDir detach directory
+   * @throws IOException if the directory is not empty or it can not be removed
+   */
+  private void cleanupDetachDir(File detachDir) throws IOException {
+    if (layoutVersion >= PRE_RBW_LAYOUT_VERSION &&
+        detachDir.exists() && detachDir.isDirectory() ) {
+      
+        if (detachDir.list().length != 0 ) {
+          throw new IOException("Detached directory " + detachDir +
+              " is not empty. Please manually move each file under this " +
+              "directory to the finalized directory if the finalized " +
+              "directory tree does not have the file.");
+        } else if (!detachDir.delete()) {
+          throw new IOException("Cannot remove directory " + detachDir);
+        }
+    }
+  }
+  
   void doRollback( StorageDirectory sd,
                    NamespaceInfo nsInfo
                    ) throws IOException {
@@ -359,8 +391,34 @@
       doFinalize(it.next());
     }
   }
+
+  /**
+   * Hardlink all finalized and RBW blocks in fromDir to toDir
+   * @param fromDir directory where the snapshot is stored
+   * @param toDir the current data directory
+   * @throws IOException if error occurs during hardlink
+   */
+  private void linkAllBlocks(File fromDir, File toDir) throws IOException {
+    // do the link
+    int diskLayoutVersion = this.getLayoutVersion();
+    if (diskLayoutVersion < PRE_RBW_LAYOUT_VERSION) { // RBW version
+      // hardlink finalized blocks in tmpDir/finalized
+      linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED), 
+          new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion);
+      // hardlink rbw blocks in tmpDir/finalized
+      linkBlocks(new File(fromDir, STORAGE_DIR_RBW), 
+          new File(toDir, STORAGE_DIR_RBW), diskLayoutVersion);
+    } else { // pre-RBW version
+      // hardlink finalized blocks in tmpDir
+      linkBlocks(fromDir, 
+          new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion);      
+    }    
+  }
   
   static void linkBlocks(File from, File to, int oldLV) throws IOException {
+    if (!from.exists()) {
+      return;
+    }
     if (!from.isDirectory()) {
       if (from.getName().startsWith(COPY_FILE_PREFIX)) {
         FileInputStream in = new FileInputStream(from);
@@ -387,7 +445,7 @@
       return;
     }
     // from is a directory
-    if (!to.mkdir())
+    if (!to.mkdirs())
       throw new IOException("Cannot create directory " + to);
     String[] blockNames = from.list(new java.io.FilenameFilter() {
         public boolean accept(File dir, String name) {
@@ -440,7 +498,7 @@
     if (matcher.matches()) {
       //return the current metadata file name
       return FSDataset.getMetaFileName(matcher.group(1),
-                                       Block.GRANDFATHER_GENERATION_STAMP); 
+          GenerationStamp.GRANDFATHER_GENERATION_STAMP); 
     }
     return oldFileName;
   }

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
(original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
Sat Nov 28 20:05:56 2009
@@ -38,6 +38,9 @@
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
+import org.apache.hadoop.hdfs.security.AccessTokenHandler;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
 import org.apache.hadoop.io.IOUtils;
@@ -46,8 +49,6 @@
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingInt;
 import org.apache.hadoop.metrics.util.MetricsTimeVaryingRate;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.AccessTokenHandler;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
 
@@ -127,7 +128,7 @@
   @Override
   protected void opReadBlock(DataInputStream in,
       long blockId, long blockGs, long startOffset, long length,
-      String clientName, AccessToken accessToken) throws IOException {
+      String clientName, BlockAccessToken accessToken) throws IOException {
     final Block block = new Block(blockId, 0 , blockGs);
     OutputStream baseStream = NetUtils.getOutputStream(s, 
         datanode.socketWriteTimeout);
@@ -208,9 +209,10 @@
    */
   @Override
   protected void opWriteBlock(DataInputStream in, long blockId, long blockGs,
-      int pipelineSize, boolean isRecovery,
+      int pipelineSize, BlockConstructionStage stage,
+      long newGs, long minBytesRcvd, long maxBytesRcvd,
       String client, DatanodeInfo srcDataNode, DatanodeInfo[] targets,
-      AccessToken accessToken) throws IOException {
+      BlockAccessToken accessToken) throws IOException {
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
@@ -250,11 +252,17 @@
     String firstBadLink = "";           // first datanode that failed in connection setup
     DataTransferProtocol.Status mirrorInStatus = SUCCESS;
     try {
-      // open a block receiver and check if the block does not exist
-      blockReceiver = new BlockReceiver(block, in, 
-          s.getRemoteSocketAddress().toString(),
-          s.getLocalSocketAddress().toString(),
-          isRecovery, client, srcDataNode, datanode);
+      if (client.length() == 0 || 
+          stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+        // open a block receiver
+        blockReceiver = new BlockReceiver(block, in, 
+            s.getRemoteSocketAddress().toString(),
+            s.getLocalSocketAddress().toString(),
+            stage, newGs, minBytesRcvd, maxBytesRcvd,
+            client, srcDataNode, datanode);
+      } else {
+        datanode.data.recoverClose(block, newGs, minBytesRcvd);
+      }
 
       //
       // Open network conn to backup machine, if 
@@ -282,10 +290,13 @@
 
           // Write header: Copied from DFSClient.java!
           DataTransferProtocol.Sender.opWriteBlock(mirrorOut,
-              block.getBlockId(), block.getGenerationStamp(), pipelineSize,
-              isRecovery, client, srcDataNode, targets, accessToken);
+              blockId, blockGs, 
+              pipelineSize, stage, newGs, minBytesRcvd, maxBytesRcvd, client, 
+              srcDataNode, targets, accessToken);
 
-          blockReceiver.writeChecksumHeader(mirrorOut);
+          if (blockReceiver != null) { // send checksum header
+            blockReceiver.writeChecksumHeader(mirrorOut);
+          }
           mirrorOut.flush();
 
           // read connect ack (only for clients, not for replication req)
@@ -336,24 +347,31 @@
       }
 
       // receive the block and mirror to the next target
-      String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
-      blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
-                                 mirrorAddr, null, targets.length);
+      if (blockReceiver != null) {
+        String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
+        blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
+            mirrorAddr, null, targets.length);
+      }
 
-      // if this write is for a replication request (and not
-      // from a client), then confirm block. For client-writes,
+      // update its generation stamp
+      if (client.length() != 0 && 
+          stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+        block.setGenerationStamp(newGs);
+        block.setNumBytes(minBytesRcvd);
+      }
+      
+      // if this write is for a replication request or recovering
+      // a failed close for client, then confirm block. For other client-writes,
       // the block is finalized in the PacketResponder.
-      if (client.length() == 0) {
-        datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
+      if (client.length() == 0 || 
+          stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+        datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
         LOG.info("Received block " + block + 
                  " src: " + remoteAddress +
                  " dest: " + localAddress +
                  " of size " + block.getNumBytes());
       }
 
-      if (datanode.blockScanner != null) {
-        datanode.blockScanner.addBlock(block);
-      }
       
     } catch (IOException ioe) {
       LOG.info("writeBlock " + block + " received exception " + ioe);
@@ -378,7 +396,7 @@
    */
   @Override
   protected void opBlockChecksum(DataInputStream in,
-      long blockId, long blockGs, AccessToken accessToken) throws IOException {
+      long blockId, long blockGs, BlockAccessToken accessToken) throws IOException {
     final Block block = new Block(blockId, 0 , blockGs);
     DataOutputStream out = new DataOutputStream(NetUtils.getOutputStream(s,
         datanode.socketWriteTimeout));
@@ -437,7 +455,7 @@
    */
   @Override
   protected void opCopyBlock(DataInputStream in,
-      long blockId, long blockGs, AccessToken accessToken) throws IOException {
+      long blockId, long blockGs, BlockAccessToken accessToken) throws IOException {
     // Read in the header
     Block block = new Block(blockId, 0, blockGs);
     if (datanode.isAccessTokenEnabled
@@ -508,7 +526,7 @@
   @Override
   protected void opReplaceBlock(DataInputStream in,
       long blockId, long blockGs, String sourceID, DatanodeInfo proxySource,
-      AccessToken accessToken) throws IOException {
+      BlockAccessToken accessToken) throws IOException {
     /* read header */
     final Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
         blockGs);
@@ -569,7 +587,7 @@
       blockReceiver = new BlockReceiver(
           block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
           proxySock.getLocalSocketAddress().toString(),
-          false, "", null, datanode);
+          null, 0, 0, 0, "", null, datanode);
 
       // receive a block
       blockReceiver.receiveBlock(null, null, null, null, 

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
(original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
Sat Nov 28 20:05:56 2009
@@ -32,6 +32,8 @@
 import org.apache.hadoop.hdfs.server.balancer.Balancer;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+
 
 /**
  * Server used for receiving/sending a block of data.
@@ -115,11 +117,12 @@
     this.maxXceiverCount = conf.getInt("dfs.datanode.max.xcievers",
         MAX_XCEIVER_COUNT);
     
-    this.estimateBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
+    this.estimateBlockSize = conf.getLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
     
     //set up parameter for cluster balancing
     this.balanceThrottler = new BlockBalanceThrottler(
-      conf.getLong("dfs.balance.bandwidthPerSec", 1024L*1024));
+      conf.getLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, 
+                   DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT));
   }
 
   /**

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
(original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
Sat Nov 28 20:05:56 2009
@@ -32,12 +32,12 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.security.BlockAccessToken;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessToken;
-import org.apache.hadoop.security.AccessTokenHandler;
 import org.apache.hadoop.util.StringUtils;
 
 class DatanodeJspHelper {
@@ -258,6 +258,10 @@
     out.print("<B>Total number of blocks: " + blocks.size() + "</B><br>");
     // generate a table and dump the info
     out.println("\n<table>");
+    
+    String namenodeHost = datanode.getNameNodeAddr().getHostName();
+    String namenodeHostName = InetAddress.getByName(namenodeHost).getCanonicalHostName();
+    
     for (LocatedBlock cur : blocks) {
       out.print("<tr>");
       final String blockidstring = Long.toString(cur.getBlock().getBlockId());
@@ -277,14 +281,18 @@
             + "&genstamp=" + cur.getBlock().getGenerationStamp()
             + "&namenodeInfoPort=" + namenodeInfoPort
             + "&chunkSizeToView=" + chunkSizeToView;
+
+        String blockInfoUrl = "http://" + namenodeHostName + ":"
+            + namenodeInfoPort
+            + "/block_info_xml.jsp?blockId=" + blockidstring;
         out.print("<td>&nbsp</td><td><a href=\"" + blockUrl + "\">"
-            + datanodeAddr + "</a></td>");
+            + datanodeAddr + "</a></td><td>"
+            + "<a href=\"" + blockInfoUrl + "\">View Block Info</a></td>");
       }
       out.println("</tr>");
     }
     out.println("</table>");
     out.print("<hr>");
-    String namenodeHost = datanode.getNameNodeAddr().getHostName();
     out.print("<br><a href=\"http://"
         + InetAddress.getByName(namenodeHost).getCanonicalHostName() + ":"
         + namenodeInfoPort + "/dfshealth.jsp\">Go back to DFS home</a>");
@@ -317,9 +325,10 @@
     final DFSClient dfs = new DFSClient(datanode.getNameNodeAddr(),
         JspHelper.conf);
 
-    AccessToken accessToken = AccessToken.DUMMY_TOKEN;
+    BlockAccessToken accessToken = BlockAccessToken.DUMMY_TOKEN;
     if (JspHelper.conf.getBoolean(
-        AccessTokenHandler.STRING_ENABLE_ACCESS_TOKEN, false)) {
+        DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, 
+        DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_DEFAULT)) {
       List<LocatedBlock> blks = dfs.getNamenode().getBlockLocations(filename, 0,
           Long.MAX_VALUE).getLocatedBlocks();
       if (blks == null || blks.size() == 0) {
@@ -556,7 +565,7 @@
     LocatedBlock lastBlk = blocks.get(blocks.size() - 1);
     long blockSize = lastBlk.getBlock().getNumBytes();
     long blockId = lastBlk.getBlock().getBlockId();
-    AccessToken accessToken = lastBlk.getAccessToken();
+    BlockAccessToken accessToken = lastBlk.getAccessToken();
     long genStamp = lastBlk.getBlock().getGenerationStamp();
     DatanodeInfo chosenNode;
     try {
@@ -577,4 +586,4 @@
     out.print("</textarea>");
     dfs.close();
   }
-}
\ No newline at end of file
+}

Modified: hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java?rev=885143&r1=885142&r2=885143&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
(original)
+++ hadoop/hdfs/branches/HDFS-326/src/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
Sat Nov 28 20:05:56 2009
@@ -27,6 +27,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
 
 /**
@@ -113,12 +114,12 @@
 
     @Override // Object
     public int hashCode() {
-      return 37 * 17 + (int) (blockId^(blockId>>>32));
+      return (int)(blockId^(blockId>>>32));
     }
 
     public long getGenStamp() {
       return metaFile != null ? Block.getGenerationStamp(metaFile.getName()) :
-        Block.GRANDFATHER_GENERATION_STAMP;
+        GenerationStamp.GRANDFATHER_GENERATION_STAMP;
     }
   }
 



Mime
View raw message