hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1196458 [5/9] - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/bin/ src/main/java/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ ...
Date Wed, 02 Nov 2011 05:35:26 GMT
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java Wed Nov  2 05:34:31 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 
 @InterfaceAudience.Private
@@ -171,4 +172,12 @@ public class BlockTokenIdentifier extend
     
     return cache;
   }
+  
+  @InterfaceAudience.Private
+  public static class Renewer extends Token.TrivialRenewer {
+    @Override
+    protected Text getKind() {
+      return KIND_NAME;
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java Wed Nov  2 05:34:31 2011
@@ -43,7 +43,7 @@ public class ExportedBlockKeys implement
     this(false, 0, 0, new BlockKey(), new BlockKey[0]);
   }
 
-  ExportedBlockKeys(boolean isBlockTokenEnabled, long keyUpdateInterval,
+  public ExportedBlockKeys(boolean isBlockTokenEnabled, long keyUpdateInterval,
       long tokenLifetime, BlockKey currentKey, BlockKey[] allKeys) {
     this.isBlockTokenEnabled = isBlockTokenEnabled;
     this.keyUpdateInterval = keyUpdateInterval;

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java Wed Nov  2 05:34:31 2011
@@ -17,7 +17,16 @@
  */
 package org.apache.hadoop.hdfs.security.token.delegation;
 
+import java.net.InetSocketAddress;
+
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
 
 /**
@@ -26,6 +35,30 @@ import org.apache.hadoop.security.token.
 @InterfaceAudience.Private
 public class DelegationTokenSelector
     extends AbstractDelegationTokenSelector<DelegationTokenIdentifier>{
+  public static final String SERVICE_NAME_KEY = "hdfs.service.host_";
+
+  private static final DelegationTokenSelector INSTANCE = new DelegationTokenSelector();
+
+  /** Select the delegation token for hdfs from the ugi. */
+  public static Token<DelegationTokenIdentifier> selectHdfsDelegationToken(
+      final InetSocketAddress nnAddr, final UserGroupInformation ugi,
+      final Configuration conf) {
+    // this guesses the remote cluster's rpc service port.
+    // the current token design assumes it's the same as the local cluster's
+    // rpc port unless a config key is set.  there should be a way to automatic
+    // and correctly determine the value
+    final String key = SERVICE_NAME_KEY + SecurityUtil.buildTokenService(nnAddr);
+    final String nnServiceName = conf.get(key);
+    
+    int nnRpcPort = NameNode.DEFAULT_PORT;
+    if (nnServiceName != null) {
+      nnRpcPort = NetUtils.createSocketAddr(nnServiceName, nnRpcPort).getPort(); 
+    }
+    
+    final Text serviceName = SecurityUtil.buildTokenService(
+        new InetSocketAddress(nnAddr.getHostName(), nnRpcPort));
+    return INSTANCE.selectToken(serviceName, ugi.getTokens());
+  }
 
   public DelegationTokenSelector() {
     super(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Wed Nov  2 05:34:31 2011
@@ -358,7 +358,8 @@ public class Balancer {
       if (response.getStatus() != Status.SUCCESS) {
         if (response.getStatus() == Status.ERROR_ACCESS_TOKEN)
           throw new IOException("block move failed due to access token error");
-        throw new IOException("block move is failed");
+        throw new IOException("block move is failed: " +
+            response.getMessage());
       }
     }
 
@@ -823,7 +824,7 @@ public class Balancer {
       cluster.add(datanode);
       BalancerDatanode datanodeS;
       final double avg = policy.getAvgUtilization();
-      if (policy.getUtilization(datanode) > avg) {
+      if (policy.getUtilization(datanode) >= avg) {
         datanodeS = new Source(datanode, policy, threshold);
         if (isAboveAvgUtilized(datanodeS)) {
           this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
@@ -1261,12 +1262,12 @@ public class Balancer {
     return datanode.utilization > (policy.getAvgUtilization()+threshold);
   }
   
-  /* Return true if the given datanode is above average utilized
+  /* Return true if the given datanode is above or equal to average utilized
    * but not overUtilized */
   private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
     final double avg = policy.getAvgUtilization();
     return (datanode.utilization <= (avg+threshold))
-        && (datanode.utilization > avg);
+        && (datanode.utilization >= avg);
   }
   
   /* Return true if the given datanode is underUtilized */

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Nov  2 05:34:31 2011
@@ -53,7 +53,7 @@ import org.apache.hadoop.hdfs.security.t
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
@@ -72,8 +72,6 @@ import com.google.common.annotations.Vis
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
- * This class is a helper class for {@link FSNamesystem} and requires several
- * methods to be called with lock held on {@link FSNamesystem}.
  */
 @InterfaceAudience.Private
 public class BlockManager {
@@ -176,15 +174,16 @@ public class BlockManager {
   /** for block replicas placement */
   private BlockPlacementPolicy blockplacement;
   
-  public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
-    namesystem = fsn;
-    datanodeManager = new DatanodeManager(this, fsn, conf);
+  public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
+      final Configuration conf) throws IOException {
+    this.namesystem = namesystem;
+    datanodeManager = new DatanodeManager(this, namesystem, conf);
     heartbeatManager = datanodeManager.getHeartbeatManager();
     invalidateBlocks = new InvalidateBlocks(datanodeManager);
 
     blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
     blockplacement = BlockPlacementPolicy.getInstance(
-        conf, fsn, datanodeManager.getNetworkTopology());
+        conf, stats, datanodeManager.getNetworkTopology());
     pendingReplications = new PendingReplicationBlocks(conf.getInt(
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
       DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
@@ -2580,7 +2579,7 @@ public class BlockManager {
 
     workFound = this.computeReplicationWork(blocksToProcess);
 
-    // Update FSNamesystemMetrics counters
+    // Update counters
     namesystem.writeLock();
     try {
       this.updateState();

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Nov  2 05:34:31 2011
@@ -50,8 +50,8 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
@@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.util.CyclicIteration;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.CachedDNSToSwitchMapping;
@@ -78,7 +79,7 @@ import org.apache.hadoop.util.Reflection
 public class DatanodeManager {
   static final Log LOG = LogFactory.getLog(DatanodeManager.class);
 
-  private final FSNamesystem namesystem;
+  private final Namesystem namesystem;
   private final BlockManager blockManager;
 
   private final HeartbeatManager heartbeatManager;
@@ -124,12 +125,12 @@ public class DatanodeManager {
   final int blockInvalidateLimit;
   
   DatanodeManager(final BlockManager blockManager,
-      final FSNamesystem namesystem, final Configuration conf
+      final Namesystem namesystem, final Configuration conf
       ) throws IOException {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
 
-    this.heartbeatManager = new HeartbeatManager(namesystem, conf);
+    this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
 
     this.hostsReader = new HostsFileReader(
         conf.get(DFSConfigKeys.DFS_HOSTS, ""),
@@ -163,7 +164,8 @@ public class DatanodeManager {
   private Daemon decommissionthread = null;
 
   void activate(final Configuration conf) {
-    this.decommissionthread = new Daemon(new DecommissionManager(namesystem).new Monitor(
+    final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
+    this.decommissionthread = new Daemon(dm.new Monitor(
         conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 
                     DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT),
         conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY, 
@@ -859,7 +861,7 @@ public class DatanodeManager {
         try {
           nodeinfo = getDatanode(nodeReg);
         } catch(UnregisteredNodeException e) {
-          return new DatanodeCommand[]{DatanodeCommand.REGISTER};
+          return new DatanodeCommand[]{RegisterCommand.REGISTER};
         }
         
         // Check if this datanode should actually be shutdown instead. 
@@ -869,7 +871,7 @@ public class DatanodeManager {
         }
          
         if (nodeinfo == null || !nodeinfo.isAlive) {
-          return new DatanodeCommand[]{DatanodeCommand.REGISTER};
+          return new DatanodeCommand[]{RegisterCommand.REGISTER};
         }
 
         heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed,

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Wed Nov  2 05:34:31 2011
@@ -56,4 +56,7 @@ public interface DatanodeStatistics {
    * The block related entries are set to -1.
    */
   public long[] getStats();
+
+  /** @return the expired heartbeats */
+  public int getExpiredHeartbeats();
 }
\ No newline at end of file

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java Wed Nov  2 05:34:31 2011
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 
 /**
  * Manage node decommissioning.
@@ -33,10 +33,13 @@ import org.apache.hadoop.hdfs.server.nam
 class DecommissionManager {
   static final Log LOG = LogFactory.getLog(DecommissionManager.class);
 
-  private final FSNamesystem fsnamesystem;
+  private final Namesystem namesystem;
+  private final BlockManager blockmanager;
 
-  DecommissionManager(final FSNamesystem namesystem) {
-    this.fsnamesystem = namesystem;
+  DecommissionManager(final Namesystem namesystem,
+      final BlockManager blockmanager) {
+    this.namesystem = namesystem;
+    this.blockmanager = blockmanager;
   }
 
   /** Periodically check decommission status. */
@@ -61,12 +64,12 @@ class DecommissionManager {
      */
     @Override
     public void run() {
-      for(; fsnamesystem.isRunning(); ) {
-        fsnamesystem.writeLock();
+      for(; namesystem.isRunning(); ) {
+        namesystem.writeLock();
         try {
           check();
         } finally {
-          fsnamesystem.writeUnlock();
+          namesystem.writeUnlock();
         }
   
         try {
@@ -78,7 +81,7 @@ class DecommissionManager {
     }
     
     private void check() {
-      final DatanodeManager dm = fsnamesystem.getBlockManager().getDatanodeManager();
+      final DatanodeManager dm = blockmanager.getDatanodeManager();
       int count = 0;
       for(Map.Entry<String, DatanodeDescriptor> entry
           : dm.getDatanodeCyclicIteration(firstkey)) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Wed Nov  2 05:34:31 2011
@@ -27,7 +27,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.util.Daemon;
 
 /**
@@ -55,14 +55,17 @@ class HeartbeatManager implements Datano
   /** Heartbeat monitor thread */
   private final Daemon heartbeatThread = new Daemon(new Monitor());
 
-  final FSNamesystem namesystem;
+  final Namesystem namesystem;
+  final BlockManager blockManager;
 
-  HeartbeatManager(final FSNamesystem namesystem, final Configuration conf) {
+  HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager,
+      final Configuration conf) {
     this.heartbeatRecheckInterval = conf.getInt(
         DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
         DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
 
     this.namesystem = namesystem;
+    this.blockManager = blockManager;
   }
 
   void activate(Configuration conf) {
@@ -136,6 +139,11 @@ class HeartbeatManager implements Datano
                        getBlockPoolUsed()};
   }
 
+  @Override
+  public synchronized int getExpiredHeartbeats() {
+    return stats.expiredHeartbeats;
+  }
+
   synchronized void register(final DatanodeDescriptor d) {
     if (!datanodes.contains(d)) {
       addDatanode(d);
@@ -191,7 +199,7 @@ class HeartbeatManager implements Datano
    * effect causes more datanodes to be declared dead.
    */
   void heartbeatCheck() {
-    final DatanodeManager dm = namesystem.getBlockManager().getDatanodeManager();
+    final DatanodeManager dm = blockManager.getDatanodeManager();
     // It's OK to check safe mode w/o taking the lock here, we re-check
     // for safe mode after taking the lock before removing a datanode.
     if (namesystem.isInSafeMode()) {
@@ -204,7 +212,7 @@ class HeartbeatManager implements Datano
       synchronized(this) {
         for (DatanodeDescriptor d : datanodes) {
           if (dm.isDatanodeDead(d)) {
-            namesystem.incrExpiredHeartbeats();
+            stats.incrExpiredHeartbeats();
             dead = d;
             break;
           }
@@ -244,8 +252,7 @@ class HeartbeatManager implements Datano
             heartbeatCheck();
             lastHeartbeatCheck = now;
           }
-          if (namesystem.getBlockManager().shouldUpdateBlockKey(
-              now - lastBlockKeyUpdate)) {
+          if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
             synchronized(HeartbeatManager.this) {
               for(DatanodeDescriptor d : datanodes) {
                 d.needKeyUpdate = true;
@@ -274,6 +281,8 @@ class HeartbeatManager implements Datano
     private long blockPoolUsed = 0L;
     private int xceiverCount = 0;
 
+    private int expiredHeartbeats = 0;
+
     private void add(final DatanodeDescriptor node) {
       capacityUsed += node.getDfsUsed();
       blockPoolUsed += node.getBlockPoolUsed();
@@ -297,5 +306,10 @@ class HeartbeatManager implements Datano
         capacityTotal -= node.getDfsUsed();
       }
     }
+    
+    /** Increment expired heartbeat counter. */
+    private void incrExpiredHeartbeats() {
+      expiredHeartbeats++;
+    }
   }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java Wed Nov  2 05:34:31 2011
@@ -26,19 +26,66 @@ import java.util.TreeSet;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 
-/** Keep track of under replication blocks.
- * Blocks have replication priority, with priority 0 indicating the highest
- * Blocks have only one replicas has the highest
+/**
+ * Keep prioritized queues of under replicated blocks.
+ * Blocks have replication priority, with priority {@link #QUEUE_HIGHEST_PRIORITY}
+ * indicating the highest priority.
+ * </p>
+ * Having a prioritised queue allows the {@link BlockManager} to select
+ * which blocks to replicate first -it tries to give priority to data
+ * that is most at risk or considered most valuable.
+ *
+ * <p/>
+ * The policy for choosing which priority to give added blocks
+ * is implemented in {@link #getPriority(Block, int, int, int)}.
+ * </p>
+ * <p>The queue order is as follows:</p>
+ * <ol>
+ *   <li>{@link #QUEUE_HIGHEST_PRIORITY}: the blocks that must be replicated
+ *   first. That is blocks with only one copy, or blocks with zero live
+ *   copies but a copy in a node being decommissioned. These blocks
+ *   are at risk of loss if the disk or server on which they
+ *   remain fails.</li>
+ *   <li>{@link #QUEUE_VERY_UNDER_REPLICATED}: blocks that are very
+ *   under-replicated compared to their expected values. Currently
+ *   that means the ratio of the ratio of actual:expected means that
+ *   there is <i>less than</i> 1:3.</li>. These blocks may not be at risk,
+ *   but they are clearly considered "important".
+ *   <li>{@link #QUEUE_UNDER_REPLICATED}: blocks that are also under
+ *   replicated, and the ratio of actual:expected is good enough that
+ *   they do not need to go into the {@link #QUEUE_VERY_UNDER_REPLICATED}
+ *   queue.</li>
+ *   <li>{@link #QUEUE_REPLICAS_BADLY_DISTRIBUTED}: there are as least as
+ *   many copies of a block as required, but the blocks are not adequately
+ *   distributed. Loss of a rack/switch could take all copies off-line.</li>
+ *   <li>{@link #QUEUE_WITH_CORRUPT_BLOCKS} This is for blocks that are corrupt
+ *   and for which there are no-non-corrupt copies (currently) available.
+ *   The policy here is to keep those corrupt blocks replicated, but give
+ *   blocks that are not corrupt higher priority.</li>
+ * </ol>
  */
 class UnderReplicatedBlocks implements Iterable<Block> {
+  /** The total number of queues : {@value} */
   static final int LEVEL = 5;
+  /** The queue with the highest priority: {@value} */
+  static final int QUEUE_HIGHEST_PRIORITY = 0;
+  /** The queue for blocks that are way below their expected value : {@value} */
+  static final int QUEUE_VERY_UNDER_REPLICATED = 1;
+  /** The queue for "normally" under-replicated blocks: {@value} */
+  static final int QUEUE_UNDER_REPLICATED = 2;
+  /** The queue for blocks that have the right number of replicas,
+   * but which the block manager felt were badly distributed: {@value}
+   */
+  static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3;
+  /** The queue for corrupt blocks: {@value} */
   static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
+  /** the queues themselves */
   private final List<NavigableSet<Block>> priorityQueues
-      = new ArrayList<NavigableSet<Block>>();
-      
+      = new ArrayList<NavigableSet<Block>>(LEVEL);
+
   /** Create an object. */
   UnderReplicatedBlocks() {
-    for(int i=0; i<LEVEL; i++) {
+    for (int i = 0; i < LEVEL; i++) {
       priorityQueues.add(new TreeSet<Block>());
     }
   }
@@ -47,7 +94,7 @@ class UnderReplicatedBlocks implements I
    * Empty the queues.
    */
   void clear() {
-    for(int i=0; i<LEVEL; i++) {
+    for (int i = 0; i < LEVEL; i++) {
       priorityQueues.get(i).clear();
     }
   }
@@ -55,7 +102,7 @@ class UnderReplicatedBlocks implements I
   /** Return the total number of under replication blocks */
   synchronized int size() {
     int size = 0;
-    for (int i=0; i<LEVEL; i++) {
+    for (int i = 0; i < LEVEL; i++) {
       size += priorityQueues.get(i).size();
     }
     return size;
@@ -64,12 +111,14 @@ class UnderReplicatedBlocks implements I
   /** Return the number of under replication blocks excluding corrupt blocks */
   synchronized int getUnderReplicatedBlockCount() {
     int size = 0;
-    for (int i=0; i<QUEUE_WITH_CORRUPT_BLOCKS; i++) {
-      size += priorityQueues.get(i).size();
+    for (int i = 0; i < LEVEL; i++) {
+      if (i != QUEUE_WITH_CORRUPT_BLOCKS) {
+        size += priorityQueues.get(i).size();
+      }
     }
     return size;
   }
-  
+
   /** Return the number of corrupt blocks */
   synchronized int getCorruptBlockSize() {
     return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
@@ -77,47 +126,58 @@ class UnderReplicatedBlocks implements I
   
   /** Check if a block is in the neededReplication queue */
   synchronized boolean contains(Block block) {
-    for(NavigableSet<Block> set : priorityQueues) {
-      if(set.contains(block)) { return true; }
+    for (NavigableSet<Block> set : priorityQueues) {
+      if (set.contains(block)) {
+        return true;
+      }
     }
     return false;
   }
-      
+
   /** Return the priority of a block
-   * @param block a under replication block
+   * @param block a under replicated block
    * @param curReplicas current number of replicas of the block
    * @param expectedReplicas expected number of replicas of the block
+   * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
    */
-  private int getPriority(Block block, 
+  private int getPriority(Block block,
                           int curReplicas, 
                           int decommissionedReplicas,
                           int expectedReplicas) {
     assert curReplicas >= 0 : "Negative replicas!";
     if (curReplicas >= expectedReplicas) {
-      return 3; // Block doesn't have enough racks
-    } else if(curReplicas==0) {
-      // If there are zero non-decommissioned replica but there are
+      // Block has enough copies, but not enough racks
+      return QUEUE_REPLICAS_BADLY_DISTRIBUTED;
+    } else if (curReplicas == 0) {
+      // If there are zero non-decommissioned replicas but there are
       // some decommissioned replicas, then assign them highest priority
       if (decommissionedReplicas > 0) {
-        return 0;
+        return QUEUE_HIGHEST_PRIORITY;
       }
-      return QUEUE_WITH_CORRUPT_BLOCKS; // keep these blocks in needed replication.
-    } else if(curReplicas==1) {
-      return 0; // highest priority
-    } else if(curReplicas*3<expectedReplicas) {
-      return 1;
+      //all we have are corrupt blocks
+      return QUEUE_WITH_CORRUPT_BLOCKS;
+    } else if (curReplicas == 1) {
+      //only on replica -risk of loss
+      // highest priority
+      return QUEUE_HIGHEST_PRIORITY;
+    } else if ((curReplicas * 3) < expectedReplicas) {
+      //there is less than a third as many blocks as requested;
+      //this is considered very under-replicated
+      return QUEUE_VERY_UNDER_REPLICATED;
     } else {
-      return 2;
+      //add to the normal queue for under replicated blocks
+      return QUEUE_UNDER_REPLICATED;
     }
   }
-      
+
   /** add a block to a under replication queue according to its priority
    * @param block a under replication block
    * @param curReplicas current number of replicas of the block
+   * @param decomissionedReplicas the number of decommissioned replicas
    * @param expectedReplicas expected number of replicas of the block
+   * @return true if the block was added to a queue.
    */
-  synchronized boolean add(
-                           Block block,
+  synchronized boolean add(Block block,
                            int curReplicas, 
                            int decomissionedReplicas,
                            int expectedReplicas) {
@@ -129,7 +189,7 @@ class UnderReplicatedBlocks implements I
         NameNode.stateChangeLog.debug(
           "BLOCK* NameSystem.UnderReplicationBlock.add:"
           + block
-          + " has only "+curReplicas
+          + " has only " + curReplicas
           + " replicas and need " + expectedReplicas
           + " replicas so is added to neededReplications"
           + " at priority level " + priLevel);
@@ -149,8 +209,22 @@ class UnderReplicatedBlocks implements I
                                oldExpectedReplicas);
     return remove(block, priLevel);
   }
-      
-  /** remove a block from a under replication queue given a priority*/
+
+  /**
+   * Remove a block from the under replication queues.
+   *
+   * The priLevel parameter is a hint of which queue to query
+   * first: if negative or &gt;= {@link #LEVEL} this shortcutting
+   * is not attmpted.
+   *
+   * If the block is not found in the nominated queue, an attempt is made to
+   * remove it from all queues.
+   *
+   * <i>Warning:</i> This is not a synchronized method.
+   * @param block block to remove
+   * @param priLevel expected privilege level
+   * @return true if the block was found and removed from one of the priority queues
+   */
   boolean remove(Block block, int priLevel) {
     if(priLevel >= 0 && priLevel < LEVEL 
         && priorityQueues.get(priLevel).remove(block)) {
@@ -164,8 +238,8 @@ class UnderReplicatedBlocks implements I
     } else {
       // Try to remove the block from all queues if the block was
       // not found in the queue for the given priority level.
-      for(int i=0; i<LEVEL; i++) {
-        if(priorityQueues.get(i).remove(block)) {
+      for (int i = 0; i < LEVEL; i++) {
+        if (priorityQueues.get(i).remove(block)) {
           if(NameNode.stateChangeLog.isDebugEnabled()) {
             NameNode.stateChangeLog.debug(
               "BLOCK* NameSystem.UnderReplicationBlock.remove: "
@@ -178,9 +252,24 @@ class UnderReplicatedBlocks implements I
     }
     return false;
   }
-      
-  /** update the priority level of a block */
-  synchronized void update(Block block, int curReplicas, 
+
+  /**
+   * Recalculate and potentially update the priority level of a block.
+   *
+   * If the block priority has changed from before an attempt is made to
+   * remove it from the block queue. Regardless of whether or not the block
+   * is in the block queue of (recalculate) priority, an attempt is made
+   * to add it to that queue. This ensures that the block will be
+   * in its expected priority queue (and only that queue) by the end of the
+   * method call.
+   * @param block a under replicated block
+   * @param curReplicas current number of replicas of the block
+   * @param decommissionedReplicas  the number of decommissioned replicas
+   * @param curExpectedReplicas expected number of replicas of the block
+   * @param curReplicasDelta the change in the replicate count from before
+   * @param expectedReplicasDelta the change in the expected replica count from before
+   */
+  synchronized void update(Block block, int curReplicas,
                            int decommissionedReplicas,
                            int curExpectedReplicas,
                            int curReplicasDelta, int expectedReplicasDelta) {
@@ -206,7 +295,7 @@ class UnderReplicatedBlocks implements I
         NameNode.stateChangeLog.debug(
           "BLOCK* NameSystem.UnderReplicationBlock.update:"
           + block
-          + " has only "+curReplicas
+          + " has only "+ curReplicas
           + " replicas and needs " + curExpectedReplicas
           + " replicas so is added to neededReplications"
           + " at priority level " + curPri);
@@ -218,17 +307,24 @@ class UnderReplicatedBlocks implements I
   synchronized BlockIterator iterator(int level) {
     return new BlockIterator(level);
   }
-    
+
   /** return an iterator of all the under replication blocks */
+  @Override
   public synchronized BlockIterator iterator() {
     return new BlockIterator();
   }
-  
+
+  /**
+   * An iterator over blocks.
+   */
   class BlockIterator implements Iterator<Block> {
     private int level;
     private boolean isIteratorForLevel = false;
     private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
 
+    /**
+     * Construct an iterator over all queues.
+     */
     private BlockIterator() {
       level=0;
       for(int i=0; i<LEVEL; i++) {
@@ -236,6 +332,10 @@ class UnderReplicatedBlocks implements I
       }
     }
 
+    /**
+     * Constrict an iterator for a single queue level
+     * @param l the priority level to iterate over
+     */
     private BlockIterator(int l) {
       level = l;
       isIteratorForLevel = true;
@@ -243,8 +343,9 @@ class UnderReplicatedBlocks implements I
     }
 
     private void update() {
-      if (isIteratorForLevel)
+      if (isIteratorForLevel) {
         return;
+      }
       while(level< LEVEL-1 && !iterators.get(level).hasNext()) {
         level++;
       }
@@ -252,30 +353,33 @@ class UnderReplicatedBlocks implements I
 
     @Override
     public Block next() {
-      if (isIteratorForLevel)
+      if (isIteratorForLevel) {
         return iterators.get(0).next();
+      }
       update();
       return iterators.get(level).next();
     }
 
     @Override
     public boolean hasNext() {
-      if (isIteratorForLevel)
+      if (isIteratorForLevel) {
         return iterators.get(0).hasNext();
+      }
       update();
       return iterators.get(level).hasNext();
     }
 
     @Override
     public void remove() {
-      if (isIteratorForLevel) 
+      if (isIteratorForLevel) {
         iterators.get(0).remove();
-      else
+      } else {
         iterators.get(level).remove();
+      }
     }
 
     int getPriority() {
       return level;
     }
-  }  
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Wed Nov  2 05:34:31 2011
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
 import org.apache.hadoop.hdfs.web.resources.DelegationParam;
 import org.apache.hadoop.hdfs.web.resources.UserParam;
@@ -552,6 +553,13 @@ public class JspHelper {
         DataInputStream in = new DataInputStream(buf);
         DelegationTokenIdentifier id = new DelegationTokenIdentifier();
         id.readFields(in);
+        if (context != null) {
+          final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
+          if (nn != null) {
+            // Verify the token.
+            nn.getNamesystem().verifyToken(id, token.getPassword());
+          }
+        }
         ugi = id.getUser();
         checkUsername(ugi.getShortUserName(), usernameFromQuery);
         checkUsername(ugi.getShortUserName(), user);

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Wed Nov  2 05:34:31 2011
@@ -403,8 +403,8 @@ class BlockPoolSliceScanner {
       try {
         adjustThrottler();
         
-        blockSender = new BlockSender(block, 0, -1, false, false, true,
-            datanode, null);
+        blockSender = new BlockSender(block, 0, -1, false, true, datanode,
+            null);
 
         DataOutputStream out = 
                 new DataOutputStream(new IOUtils.NullOutputStream());

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Nov  2 05:34:31 2011
@@ -24,6 +24,7 @@ import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
+import java.io.FileDescriptor;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.PureJavaCrc32;
@@ -57,10 +59,13 @@ import org.apache.hadoop.util.PureJavaCr
 class BlockReceiver implements Closeable {
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
+
+  private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
   
   private DataInputStream in = null; // from where data are read
   private DataChecksum checksum; // from where chunks of a block can be read
   private OutputStream out = null; // to block file at local disk
+  private FileDescriptor outFd;
   private OutputStream cout = null; // output stream for cehcksum file
   private DataOutputStream checksumOut = null; // to crc file at local disk
   private int bytesPerChecksum;
@@ -80,6 +85,11 @@ class BlockReceiver implements Closeable
   private final DataNode datanode;
   volatile private boolean mirrorError;
 
+  // Cache management state
+  private boolean dropCacheBehindWrites;
+  private boolean syncBehindWrites;
+  private long lastCacheDropOffset = 0;
+
   /** The client name.  It is empty if a datanode is the client */
   private final String clientname;
   private final boolean isClient; 
@@ -98,7 +108,8 @@ class BlockReceiver implements Closeable
       final BlockConstructionStage stage, 
       final long newGs, final long minBytesRcvd, final long maxBytesRcvd, 
       final String clientname, final DatanodeInfo srcDataNode,
-      final DataNode datanode) throws IOException {
+      final DataNode datanode, DataChecksum requestedChecksum)
+      throws IOException {
     try{
       this.block = block;
       this.in = in;
@@ -167,9 +178,11 @@ class BlockReceiver implements Closeable
         }
       }
       // read checksum meta information
-      this.checksum = DataChecksum.newDataChecksum(in);
+      this.checksum = requestedChecksum;
       this.bytesPerChecksum = checksum.getBytesPerChecksum();
       this.checksumSize = checksum.getChecksumSize();
+      this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
+      this.syncBehindWrites = datanode.shouldSyncBehindWrites();
       
       final boolean isCreate = isDatanode || isTransfer 
           || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
@@ -177,6 +190,12 @@ class BlockReceiver implements Closeable
           this.bytesPerChecksum, this.checksumSize);
       if (streams != null) {
         this.out = streams.dataOut;
+        if (out instanceof FileOutputStream) {
+          this.outFd = ((FileOutputStream)out).getFD();
+        } else {
+          LOG.warn("Could not get file descriptor for outputstream of class " +
+              out.getClass());
+        }
         this.cout = streams.checksumOut;
         this.checksumOut = new DataOutputStream(new BufferedOutputStream(
             streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
@@ -631,6 +650,8 @@ class BlockReceiver implements Closeable
           );
           
           datanode.metrics.incrBytesWritten(len);
+
+          dropOsCacheBehindWriter(offsetInBlock);
         }
       } catch (IOException iex) {
         datanode.checkDiskError(iex);
@@ -645,10 +666,27 @@ class BlockReceiver implements Closeable
     return lastPacketInBlock?-1:len;
   }
 
-  void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
-    checksum.writeHeader(mirrorOut);
+  private void dropOsCacheBehindWriter(long offsetInBlock) throws IOException {
+    try {
+      if (outFd != null &&
+          offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) {
+        long twoWindowsAgo = lastCacheDropOffset - CACHE_DROP_LAG_BYTES;
+        if (twoWindowsAgo > 0 && dropCacheBehindWrites) {
+          NativeIO.posixFadviseIfPossible(outFd, 0, lastCacheDropOffset,
+              NativeIO.POSIX_FADV_DONTNEED);
+        }
+        
+        if (syncBehindWrites) {
+          NativeIO.syncFileRangeIfPossible(outFd, lastCacheDropOffset, CACHE_DROP_LAG_BYTES,
+              NativeIO.SYNC_FILE_RANGE_WRITE);
+        }
+        
+        lastCacheDropOffset += CACHE_DROP_LAG_BYTES;
+      }
+    } catch (Throwable t) {
+      LOG.warn("Couldn't drop os cache behind writer for " + block, t);
+    }
   }
- 
 
   void receiveBlock(
       DataOutputStream mirrOut, // output to next datanode

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Wed Nov  2 05:34:31 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.da
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.FileDescriptor;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -36,6 +37,9 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.DataChecksum;
 
@@ -118,7 +122,9 @@ class BlockSender implements java.io.Clo
   private DataInputStream checksumIn;
   /** Checksum utility */
   private final DataChecksum checksum;
-  /** Starting position to read */
+  /** Initial position to read */
+  private long initialOffset;
+  /** Current position of read */
   private long offset;
   /** Position of last byte to read from block file */
   private final long endOffset;
@@ -128,8 +134,6 @@ class BlockSender implements java.io.Clo
   private final int checksumSize;
   /** If true, failure to read checksum is ignored */
   private final boolean corruptChecksumOk;
-  /** true if chunk offset is needed to be sent in Checksum header */
-  private final boolean chunkOffsetOK;
   /** Sequence number of packet being sent */
   private long seqno;
   /** Set to true if transferTo is allowed for sending data to the client */
@@ -142,6 +146,24 @@ class BlockSender implements java.io.Clo
   private final String clientTraceFmt;
   private volatile ChunkChecksum lastChunkChecksum = null;
   
+  /** The file descriptor of the block being sent */
+  private FileDescriptor blockInFd;
+
+  // Cache-management related fields
+  private final long readaheadLength;
+  private boolean shouldDropCacheBehindRead;
+  private ReadaheadRequest curReadahead;
+  private long lastCacheDropOffset;
+  private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
+  /**
+   * Minimum length of read below which management of the OS
+   * buffer cache is disabled.
+   */
+  private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
+  
+  private static ReadaheadPool readaheadPool =
+    ReadaheadPool.getInstance();
+
   /**
    * Constructor
    * 
@@ -149,22 +171,22 @@ class BlockSender implements java.io.Clo
    * @param startOffset starting offset to read from
    * @param length length of data to read
    * @param corruptChecksumOk
-   * @param chunkOffsetOK need to send check offset in checksum header
    * @param verifyChecksum verify checksum while reading the data
    * @param datanode datanode from which the block is being read
    * @param clientTraceFmt format string used to print client trace logs
    * @throws IOException
    */
   BlockSender(ExtendedBlock block, long startOffset, long length,
-              boolean corruptChecksumOk, boolean chunkOffsetOK,
-              boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
+              boolean corruptChecksumOk, boolean verifyChecksum,
+              DataNode datanode, String clientTraceFmt)
       throws IOException {
     try {
       this.block = block;
-      this.chunkOffsetOK = chunkOffsetOK;
       this.corruptChecksumOk = corruptChecksumOk;
       this.verifyChecksum = verifyChecksum;
       this.clientTraceFmt = clientTraceFmt;
+      this.readaheadLength = datanode.getReadaheadLength();
+      this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads();
       
       synchronized(datanode.data) { 
         this.replica = getReplica(block, datanode);
@@ -277,6 +299,11 @@ class BlockSender implements java.io.Clo
         DataNode.LOG.debug("replica=" + replica);
       }
       blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
+      if (blockIn instanceof FileInputStream) {
+        blockInFd = ((FileInputStream)blockIn).getFD();
+      } else {
+        blockInFd = null;
+      }
     } catch (IOException ioe) {
       IOUtils.closeStream(this);
       IOUtils.closeStream(blockIn);
@@ -288,6 +315,20 @@ class BlockSender implements java.io.Clo
    * close opened files.
    */
   public void close() throws IOException {
+    if (blockInFd != null && shouldDropCacheBehindRead) {
+      // drop the last few MB of the file from cache
+      try {
+        NativeIO.posixFadviseIfPossible(
+            blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
+            NativeIO.POSIX_FADV_DONTNEED);
+      } catch (Exception e) {
+        LOG.warn("Unable to drop cache on file close", e);
+      }
+    }
+    if (curReadahead != null) {
+      curReadahead.cancel();
+    }
+    
     IOException ioe = null;
     if(checksumIn!=null) {
       try {
@@ -304,6 +345,7 @@ class BlockSender implements java.io.Clo
         ioe = e;
       }
       blockIn = null;
+      blockInFd = null;
     }
     // throw IOException if there is any
     if(ioe!= null) {
@@ -538,14 +580,22 @@ class BlockSender implements java.io.Clo
     if (out == null) {
       throw new IOException( "out stream is null" );
     }
-    final long initialOffset = offset;
+    initialOffset = offset;
     long totalRead = 0;
     OutputStream streamForSendChunks = out;
     
+    lastCacheDropOffset = initialOffset;
+
+    if (isLongRead() && blockInFd != null) {
+      // Advise that this file descriptor will be accessed sequentially.
+      NativeIO.posixFadviseIfPossible(blockInFd, 0, 0, NativeIO.POSIX_FADV_SEQUENTIAL);
+    }
+    
+    // Trigger readahead of beginning of file if configured.
+    manageOsCache();
+
     final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
     try {
-      writeChecksumHeader(out);
-      
       int maxChunksPerPacket;
       int pktSize = PacketHeader.PKT_HEADER_LEN;
       boolean transferTo = transferToAllowed && !verifyChecksum
@@ -569,6 +619,7 @@ class BlockSender implements java.io.Clo
       ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
 
       while (endOffset > offset) {
+        manageOsCache();
         long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
             transferTo, throttler);
         offset += len;
@@ -595,22 +646,45 @@ class BlockSender implements java.io.Clo
     }
     return totalRead;
   }
-  
+
   /**
-   * Write checksum header to the output stream
+   * Manage the OS buffer cache by performing read-ahead
+   * and drop-behind.
    */
-  private void writeChecksumHeader(DataOutputStream out) throws IOException {
-    try {
-      checksum.writeHeader(out);
-      if (chunkOffsetOK) {
-        out.writeLong(offset);
+  private void manageOsCache() throws IOException {
+    if (!isLongRead() || blockInFd == null) {
+      // don't manage cache manually for short-reads, like
+      // HBase random read workloads.
+      return;
+    }
+
+    // Perform readahead if necessary
+    if (readaheadLength > 0 && readaheadPool != null) {
+      curReadahead = readaheadPool.readaheadStream(
+          clientTraceFmt, blockInFd,
+          offset, readaheadLength, Long.MAX_VALUE,
+          curReadahead);
+    }
+
+    // Drop what we've just read from cache, since we aren't
+    // likely to need it again
+    long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
+    if (shouldDropCacheBehindRead &&
+        offset >= nextCacheDropOffset) {
+      long dropLength = offset - lastCacheDropOffset;
+      if (dropLength >= 1024) {
+        NativeIO.posixFadviseIfPossible(blockInFd,
+            lastCacheDropOffset, dropLength,
+            NativeIO.POSIX_FADV_DONTNEED);
       }
-      out.flush();
-    } catch (IOException e) { //socket error
-      throw ioeToSocketException(e);
+      lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES;
     }
   }
-    
+
+  private boolean isLongRead() {
+    return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
+  }
+
   /**
    * Write packet header into {@code pkt}
    */
@@ -624,4 +698,19 @@ class BlockSender implements java.io.Clo
   boolean didSendEntireByteRange() {
     return sentEntireByteRange;
   }
+
+  /**
+   * @return the checksum type that will be used with this block transfer.
+   */
+  DataChecksum getChecksum() {
+    return checksum;
+  }
+
+  /**
+   * @return the offset into the block file where the sender is currently
+   * reading.
+   */
+  long getOffset() {
+    return offset;
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Nov  2 05:34:31 2011
@@ -104,6 +104,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -123,6 +124,7 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeProtocolServerSideTranslatorR23;
 import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -150,12 +152,16 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
 import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolServerSideTranslatorR23;
+import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolTranslatorR23;
+import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeWireProtocol;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.http.HttpServer;
@@ -413,6 +419,11 @@ public class DataNode extends Configured
   int socketTimeout;
   int socketWriteTimeout = 0;  
   boolean transferToAllowed = true;
+  private boolean dropCacheBehindWrites = false;
+  private boolean syncBehindWrites = false;
+  private boolean dropCacheBehindReads = false;
+  private long readaheadLength = 0;
+
   int writePacketSize = 0;
   boolean isBlockTokenEnabled;
   BlockPoolTokenSecretManager blockPoolTokenSecretManager;
@@ -496,6 +507,20 @@ public class DataNode extends Configured
         DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT);
     this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
                                        DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+
+    this.readaheadLength = conf.getLong(
+        DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
+        DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+    this.dropCacheBehindWrites = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);
+    this.syncBehindWrites = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
+        DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
+    this.dropCacheBehindReads = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
+
     this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
         DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
     this.initialBlockReportDelay = conf.getLong(
@@ -554,7 +579,7 @@ public class DataNode extends Configured
     if (conf.getBoolean(DFS_WEBHDFS_ENABLED_KEY, DFS_WEBHDFS_ENABLED_DEFAULT)) {
       infoServer.addJerseyResourcePackage(DatanodeWebHdfsMethods.class
           .getPackage().getName() + ";" + Param.class.getPackage().getName(),
-          "/" + WebHdfsFileSystem.PATH_PREFIX + "/*");
+          WebHdfsFileSystem.PATH_PREFIX + "/*");
     }
     this.infoServer.start();
   }
@@ -576,13 +601,22 @@ public class DataNode extends Configured
     InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
         conf.get("dfs.datanode.ipc.address"));
     
-    // Add all the RPC protocols that the Datanode implements
-    ipcServer = RPC.getServer(ClientDatanodeProtocol.class, this, ipcAddr.getHostName(),
+    // Add all the RPC protocols that the Datanode implements    
+    ClientDatanodeProtocolServerSideTranslatorR23 
+        clientDatanodeProtocolServerTranslator = 
+          new ClientDatanodeProtocolServerSideTranslatorR23(this);
+    ipcServer = RPC.getServer(
+      org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class,
+      clientDatanodeProtocolServerTranslator, ipcAddr.getHostName(),
                               ipcAddr.getPort(), 
                               conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY, 
                                           DFS_DATANODE_HANDLER_COUNT_DEFAULT), 
                               false, conf, blockPoolTokenSecretManager);
-    ipcServer.addProtocol(InterDatanodeProtocol.class, this);
+    InterDatanodeProtocolServerSideTranslatorR23 
+        interDatanodeProtocolServerTranslator = 
+          new InterDatanodeProtocolServerSideTranslatorR23(this);
+    ipcServer.addProtocol(InterDatanodeWireProtocol.class, 
+        interDatanodeProtocolServerTranslator);
     
     // set service-level authorization security policy
     if (conf.getBoolean(
@@ -1137,8 +1171,15 @@ public class DataNode extends Configured
             if (!heartbeatsDisabledForTests) {
               DatanodeCommand[] cmds = sendHeartBeat();
               metrics.addHeartbeat(now() - startTime);
+
+              long startProcessCommands = now();
               if (!processCommand(cmds))
                 continue;
+              long endProcessCommands = now();
+              if (endProcessCommands - startProcessCommands > 2000) {
+                LOG.info("Took " + (endProcessCommands - startProcessCommands) +
+                    "ms to process " + cmds.length + " commands from NN");
+              }
             }
           }
           if (pendingReceivedRequests > 0
@@ -1412,7 +1453,7 @@ public class DataNode extends Configured
         }
         break;
       case DatanodeProtocol.DNA_FINALIZE:
-        storage.finalizeUpgrade(((DatanodeCommand.Finalize) cmd)
+        storage.finalizeUpgrade(((FinalizeCommand) cmd)
             .getBlockPoolId());
         break;
       case UpgradeCommand.UC_ACTION_START_UPGRADE:
@@ -1634,15 +1675,13 @@ public class DataNode extends Configured
     if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
       InterDatanodeProtocol.LOG.debug("InterDatanodeProtocol addr=" + addr);
     }
-    UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+    final UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
     try {
       return loginUgi
           .doAs(new PrivilegedExceptionAction<InterDatanodeProtocol>() {
             public InterDatanodeProtocol run() throws IOException {
-              return (InterDatanodeProtocol) RPC.getProxy(
-                  InterDatanodeProtocol.class, InterDatanodeProtocol.versionID,
-                  addr, UserGroupInformation.getCurrentUser(), conf,
-                  NetUtils.getDefaultSocketFactory(conf), socketTimeout);
+              return new InterDatanodeProtocolTranslatorR23(addr, loginUgi,
+                  conf, NetUtils.getDefaultSocketFactory(conf), socketTimeout);
             }
           });
     } catch (InterruptedException ie) {
@@ -1878,7 +1917,7 @@ public class DataNode extends Configured
       nn.reportBadBlocks(new LocatedBlock[]{
           new LocatedBlock(block, new DatanodeInfo[] {
               new DatanodeInfo(bpReg)})});
-      LOG.info("Can't replicate block " + block
+      LOG.warn("Can't replicate block " + block
           + " because on-disk length " + onDiskLength 
           + " is shorter than NameNode recorded length " + block.getNumBytes());
       return;
@@ -2058,7 +2097,7 @@ public class DataNode extends Configured
         out = new DataOutputStream(new BufferedOutputStream(baseStream,
             HdfsConstants.SMALL_BUFFER_SIZE));
         blockSender = new BlockSender(b, 0, b.getNumBytes(), 
-            false, false, false, DataNode.this, null);
+            false, false, DataNode.this, null);
         DatanodeInfo srcNode = new DatanodeInfo(bpReg);
 
         //
@@ -2071,7 +2110,7 @@ public class DataNode extends Configured
         }
 
         new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
-            stage, 0, 0, 0, 0);
+            stage, 0, 0, 0, 0, blockSender.getChecksum());
 
         // send data & checksum
         blockSender.sendBlock(out, baseStream, null);
@@ -2884,4 +2923,20 @@ public class DataNode extends Configured
                        (DataXceiverServer) this.dataXceiverServer.getRunnable();
     return dxcs.balanceThrottler.getBandwidth();
   }
+
+  long getReadaheadLength() {
+    return readaheadLength;
+  }
+
+  boolean shouldDropCacheBehindWrites() {
+    return dropCacheBehindWrites;
+  }
+
+  boolean shouldDropCacheBehindReads() {
+    return dropCacheBehindReads;
+  }
+  
+  boolean shouldSyncBehindWrites() {
+    return syncBehindWrites;
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Nov  2 05:34:31 2011
@@ -44,12 +44,16 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
 import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProtoOrBuilder;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -92,7 +96,6 @@ class DataXceiver extends Receiver imple
     this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
     this.datanode = datanode;
     this.dataXceiverServer = dataXceiverServer;
-    dataXceiverServer.childSockets.put(s, s);
     remoteAddress = s.getRemoteSocketAddress().toString();
     localAddress = s.getLocalSocketAddress().toString();
 
@@ -129,6 +132,7 @@ class DataXceiver extends Receiver imple
   public void run() {
     int opsProcessed = 0;
     Op op = null;
+    dataXceiverServer.childSockets.put(s, s);
     try {
       int stdTimeout = s.getSoTimeout();
 
@@ -223,15 +227,17 @@ class DataXceiver extends Receiver imple
     try {
       try {
         blockSender = new BlockSender(block, blockOffset, length,
-            true, true, false, datanode, clientTraceFmt);
+            true, false, datanode, clientTraceFmt);
       } catch(IOException e) {
-        LOG.info("opReadBlock " + block + " received exception " + e);
-        sendResponse(s, ERROR, datanode.socketWriteTimeout);
+        String msg = "opReadBlock " + block + " received exception " + e; 
+        LOG.info(msg);
+        sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
         throw e;
       }
       
       // send op status
-      sendResponse(s, SUCCESS, datanode.socketWriteTimeout);
+      writeSuccessWithChecksumInfo(blockSender,
+          getStreamWithTimeout(s, datanode.socketWriteTimeout));
 
       long read = blockSender.sendBlock(out, baseStream, null); // send data
 
@@ -289,7 +295,8 @@ class DataXceiver extends Receiver imple
       final int pipelineSize,
       final long minBytesRcvd,
       final long maxBytesRcvd,
-      final long latestGenerationStamp) throws IOException {
+      final long latestGenerationStamp,
+      DataChecksum requestedChecksum) throws IOException {
     updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
     final boolean isDatanode = clientname.length() == 0;
     final boolean isClient = !isDatanode;
@@ -348,7 +355,7 @@ class DataXceiver extends Receiver imple
             s.getRemoteSocketAddress().toString(),
             s.getLocalSocketAddress().toString(),
             stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
-            clientname, srcDataNode, datanode);
+            clientname, srcDataNode, datanode, requestedChecksum);
       } else {
         datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
       }
@@ -378,11 +385,8 @@ class DataXceiver extends Receiver imple
 
           new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
               clientname, targets, srcDataNode, stage, pipelineSize,
-              minBytesRcvd, maxBytesRcvd, latestGenerationStamp);
+              minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum);
 
-          if (blockReceiver != null) { // send checksum header
-            blockReceiver.writeChecksumHeader(mirrorOut);
-          }
           mirrorOut.flush();
 
           // read connect ack (only for clients, not for replication req)
@@ -452,7 +456,7 @@ class DataXceiver extends Receiver imple
           if (LOG.isTraceEnabled()) {
             LOG.trace("TRANSFER: send close-ack");
           }
-          writeResponse(SUCCESS, replyOut);
+          writeResponse(SUCCESS, null, replyOut);
         }
       }
 
@@ -507,7 +511,7 @@ class DataXceiver extends Receiver imple
         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
     try {
       datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
-      writeResponse(Status.SUCCESS, out);
+      writeResponse(Status.SUCCESS, null, out);
     } finally {
       IOUtils.closeStream(out);
     }
@@ -577,16 +581,17 @@ class DataXceiver extends Receiver imple
         LOG.warn("Invalid access token in request from " + remoteAddress
             + " for OP_COPY_BLOCK for block " + block + " : "
             + e.getLocalizedMessage());
-        sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
+        sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", datanode.socketWriteTimeout);
         return;
       }
 
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
-      LOG.info("Not able to copy block " + block.getBlockId() + " to " 
-          + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
-      sendResponse(s, ERROR, datanode.socketWriteTimeout);
+      String msg = "Not able to copy block " + block.getBlockId() + " to " 
+      + s.getRemoteSocketAddress() + " because threads quota is exceeded."; 
+      LOG.info(msg);
+      sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
       return;
     }
 
@@ -596,8 +601,8 @@ class DataXceiver extends Receiver imple
 
     try {
       // check if the block exists or not
-      blockSender = new BlockSender(block, 0, -1, false, false, false, 
-          datanode, null);
+      blockSender = new BlockSender(block, 0, -1, false, false, datanode, 
+          null);
 
       // set up response stream
       OutputStream baseStream = NetUtils.getOutputStream(
@@ -606,7 +611,7 @@ class DataXceiver extends Receiver imple
           baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
 
       // send status first
-      writeResponse(SUCCESS, reply);
+      writeSuccessWithChecksumInfo(blockSender, reply);
       // send block content to the target
       long read = blockSender.sendBlock(reply, baseStream, 
                                         dataXceiverServer.balanceThrottler);
@@ -653,21 +658,24 @@ class DataXceiver extends Receiver imple
         LOG.warn("Invalid access token in request from " + remoteAddress
             + " for OP_REPLACE_BLOCK for block " + block + " : "
             + e.getLocalizedMessage());
-        sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
+        sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token",
+            datanode.socketWriteTimeout);
         return;
       }
     }
 
     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
-      LOG.warn("Not able to receive block " + block.getBlockId() + " from " 
-          + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
-      sendResponse(s, ERROR, datanode.socketWriteTimeout);
+      String msg = "Not able to receive block " + block.getBlockId() + " from " 
+          + s.getRemoteSocketAddress() + " because threads quota is exceeded."; 
+      LOG.warn(msg);
+      sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
       return;
     }
 
     Socket proxySock = null;
     DataOutputStream proxyOut = null;
     Status opStatus = SUCCESS;
+    String errMsg = null;
     BlockReceiver blockReceiver = null;
     DataInputStream proxyReply = null;
     
@@ -702,11 +710,16 @@ class DataXceiver extends Receiver imple
         throw new IOException("Copy block " + block + " from "
             + proxySock.getRemoteSocketAddress() + " failed");
       }
+      
+      // get checksum info about the block we're copying
+      ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
+      DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
+          checksumInfo.getChecksum());
       // open a block receiver and check if the block does not exist
       blockReceiver = new BlockReceiver(
           block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
           proxySock.getLocalSocketAddress().toString(),
-          null, 0, 0, 0, "", null, datanode);
+          null, 0, 0, 0, "", null, datanode, remoteChecksum);
 
       // receive a block
       blockReceiver.receiveBlock(null, null, null, null, 
@@ -720,7 +733,8 @@ class DataXceiver extends Receiver imple
       
     } catch (IOException ioe) {
       opStatus = ERROR;
-      LOG.info("opReplaceBlock " + block + " received exception " + ioe);
+      errMsg = "opReplaceBlock " + block + " received exception " + ioe; 
+      LOG.info(errMsg);
       throw ioe;
     } finally {
       // receive the last byte that indicates the proxy released its thread resource
@@ -736,7 +750,7 @@ class DataXceiver extends Receiver imple
       
       // send response back
       try {
-        sendResponse(s, opStatus, datanode.socketWriteTimeout);
+        sendResponse(s, opStatus, errMsg, datanode.socketWriteTimeout);
       } catch (IOException ioe) {
         LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
       }
@@ -759,20 +773,41 @@ class DataXceiver extends Receiver imple
    * @param opStatus status message to write
    * @param timeout send timeout
    **/
-  private void sendResponse(Socket s, Status status,
+  private static void sendResponse(Socket s, Status status, String message,
       long timeout) throws IOException {
-    DataOutputStream reply = 
-      new DataOutputStream(NetUtils.getOutputStream(s, timeout));
+    DataOutputStream reply = getStreamWithTimeout(s, timeout);
     
-    writeResponse(status, reply);
+    writeResponse(status, message, reply);
   }
   
-  private void writeResponse(Status status, OutputStream out)
+  private static DataOutputStream getStreamWithTimeout(Socket s, long timeout)
+      throws IOException {
+    return new DataOutputStream(NetUtils.getOutputStream(s, timeout));
+  }
+
+  private static void writeResponse(Status status, String message, OutputStream out)
   throws IOException {
+    BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
+      .setStatus(status);
+    if (message != null) {
+      response.setMessage(message);
+    }
+    response.build().writeDelimitedTo(out);
+    out.flush();
+  }
+  
+  private void writeSuccessWithChecksumInfo(BlockSender blockSender,
+      DataOutputStream out) throws IOException {
+
+    ReadOpChecksumInfoProto ckInfo = ReadOpChecksumInfoProto.newBuilder()
+      .setChecksum(DataTransferProtoUtil.toProto(blockSender.getChecksum()))
+      .setChunkOffset(blockSender.getOffset())
+      .build();
+      
     BlockOpResponseProto response = BlockOpResponseProto.newBuilder()
-      .setStatus(status)
+      .setStatus(SUCCESS)
+      .setReadOpChecksumInfo(ckInfo)
       .build();
-    
     response.writeDelimitedTo(out);
     out.flush();
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Wed Nov  2 05:34:31 2011
@@ -30,7 +30,6 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.balancer.Balancer;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
@@ -132,17 +131,12 @@ class DataXceiverServer implements Runna
   @Override
   public void run() {
     while (datanode.shouldRun) {
+      Socket s = null;
       try {
-        Socket s = ss.accept();
+        s = ss.accept();
         s.setTcpNoDelay(true);
-        final DataXceiver exciver;
-        try {
-          exciver = new DataXceiver(s, datanode, this);
-        } catch(IOException e) {
-          IOUtils.closeSocket(s);
-          throw e;
-        }
-        new Daemon(datanode.threadGroup, exciver).start();
+        new Daemon(datanode.threadGroup, new DataXceiver(s, datanode, this))
+            .start();
       } catch (SocketTimeoutException ignored) {
         // wake up to see if should continue to run
       } catch (AsynchronousCloseException ace) {
@@ -152,7 +146,19 @@ class DataXceiverServer implements Runna
           LOG.warn(datanode.getMachineName() + ":DataXceiverServer: ", ace);
         }
       } catch (IOException ie) {
+        IOUtils.closeSocket(s);
         LOG.warn(datanode.getMachineName() + ":DataXceiverServer: ", ie);
+      } catch (OutOfMemoryError ie) {
+        IOUtils.closeSocket(s);
+        // DataNode can run out of memory if there is too many transfers.
+        // Log the event, Sleep for 30 seconds, other transfers may complete by
+        // then.
+        LOG.warn("DataNode is out of memory. Will retry in 30 seconds.", ie);
+        try {
+          Thread.sleep(30 * 1000);
+        } catch (InterruptedException e) {
+          // ignore
+        }
       } catch (Throwable te) {
         LOG.error(datanode.getMachineName()
             + ":DataXceiverServer: Exiting due to: ", te);

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Nov  2 05:34:31 2011
@@ -59,7 +59,6 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.metrics2.util.MBeans;
@@ -1263,8 +1262,8 @@ public class FSDataset implements FSData
       throws IOException {
     File f = validateBlockFile(bpid, b);
     if(f == null) {
-      if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
-        InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
+      if (DataNode.LOG.isDebugEnabled()) {
+        DataNode.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
       }
       throw new IOException("Block " + b + " is not valid.");
     }
@@ -2003,8 +2002,8 @@ public class FSDataset implements FSData
       datanode.checkDiskError();
     }
     
-    if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
-      InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);
+    if (DataNode.LOG.isDebugEnabled()) {
+      DataNode.LOG.debug("b=" + b + ", f=" + f);
     }
     return null;
   }
@@ -2088,10 +2087,9 @@ public class FSDataset implements FSData
         volumeMap.remove(bpid, invalidBlks[i]);
       }
       File metaFile = getMetaFile(f, invalidBlks[i].getGenerationStamp());
-      long dfsBytes = f.length() + metaFile.length();
 
       // Delete the block asynchronously to make sure we can do it fast enough
-      asyncDiskService.deleteAsync(v, f, metaFile, dfsBytes,
+      asyncDiskService.deleteAsync(v, f, metaFile,
           new ExtendedBlock(bpid, invalidBlks[i]));
     }
     if (error) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java Wed Nov  2 05:34:31 2011
@@ -152,11 +152,11 @@ class FSDatasetAsyncDiskService {
    * dfsUsed statistics accordingly.
    */
   void deleteAsync(FSDataset.FSVolume volume, File blockFile, File metaFile,
-      long dfsBytes, ExtendedBlock block) {
+      ExtendedBlock block) {
     DataNode.LOG.info("Scheduling block " + block.getLocalBlock().toString()
         + " file " + blockFile + " for deletion");
     ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(dataset,
-        volume, blockFile, metaFile, dfsBytes, block);
+        volume, blockFile, metaFile, block);
     execute(volume.getCurrentDir(), deletionTask);
   }
   
@@ -168,16 +168,14 @@ class FSDatasetAsyncDiskService {
     final FSDataset.FSVolume volume;
     final File blockFile;
     final File metaFile;
-    final long dfsBytes;
     final ExtendedBlock block;
     
     ReplicaFileDeleteTask(FSDataset dataset, FSDataset.FSVolume volume, File blockFile,
-        File metaFile, long dfsBytes, ExtendedBlock block) {
+        File metaFile, ExtendedBlock block) {
       this.dataset = dataset;
       this.volume = volume;
       this.blockFile = blockFile;
       this.metaFile = metaFile;
-      this.dfsBytes = dfsBytes;
       this.block = block;
     }
     
@@ -195,6 +193,7 @@ class FSDatasetAsyncDiskService {
 
     @Override
     public void run() {
+      long dfsBytes = blockFile.length() + metaFile.length();
       if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
         DataNode.LOG.warn("Unexpected error trying to delete block "
             + block.getBlockPoolId() + " " + block.getLocalBlock().toString()



Mime
View raw message