Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/BlockTokenIdentifier.java Wed Nov 2 05:34:31 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@InterfaceAudience.Private
@@ -171,4 +172,12 @@ public class BlockTokenIdentifier extend
return cache;
}
+
+ @InterfaceAudience.Private
+ public static class Renewer extends Token.TrivialRenewer {
+ @Override
+ protected Text getKind() {
+ return KIND_NAME;
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/block/ExportedBlockKeys.java Wed Nov 2 05:34:31 2011
@@ -43,7 +43,7 @@ public class ExportedBlockKeys implement
this(false, 0, 0, new BlockKey(), new BlockKey[0]);
}
- ExportedBlockKeys(boolean isBlockTokenEnabled, long keyUpdateInterval,
+ public ExportedBlockKeys(boolean isBlockTokenEnabled, long keyUpdateInterval,
long tokenLifetime, BlockKey currentKey, BlockKey[] allKeys) {
this.isBlockTokenEnabled = isBlockTokenEnabled;
this.keyUpdateInterval = keyUpdateInterval;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenSelector.java Wed Nov 2 05:34:31 2011
@@ -17,7 +17,16 @@
*/
package org.apache.hadoop.hdfs.security.token.delegation;
+import java.net.InetSocketAddress;
+
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
/**
@@ -26,6 +35,30 @@ import org.apache.hadoop.security.token.
@InterfaceAudience.Private
public class DelegationTokenSelector
extends AbstractDelegationTokenSelector<DelegationTokenIdentifier>{
+ public static final String SERVICE_NAME_KEY = "hdfs.service.host_";
+
+ private static final DelegationTokenSelector INSTANCE = new DelegationTokenSelector();
+
+ /** Select the delegation token for hdfs from the ugi. */
+ public static Token<DelegationTokenIdentifier> selectHdfsDelegationToken(
+ final InetSocketAddress nnAddr, final UserGroupInformation ugi,
+ final Configuration conf) {
+ // this guesses the remote cluster's rpc service port.
+ // the current token design assumes it's the same as the local cluster's
+ // rpc port unless a config key is set. there should be a way to automatic
+ // and correctly determine the value
+ final String key = SERVICE_NAME_KEY + SecurityUtil.buildTokenService(nnAddr);
+ final String nnServiceName = conf.get(key);
+
+ int nnRpcPort = NameNode.DEFAULT_PORT;
+ if (nnServiceName != null) {
+ nnRpcPort = NetUtils.createSocketAddr(nnServiceName, nnRpcPort).getPort();
+ }
+
+ final Text serviceName = SecurityUtil.buildTokenService(
+ new InetSocketAddress(nnAddr.getHostName(), nnRpcPort));
+ return INSTANCE.selectToken(serviceName, ugi.getTokens());
+ }
public DelegationTokenSelector() {
super(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java Wed Nov 2 05:34:31 2011
@@ -358,7 +358,8 @@ public class Balancer {
if (response.getStatus() != Status.SUCCESS) {
if (response.getStatus() == Status.ERROR_ACCESS_TOKEN)
throw new IOException("block move failed due to access token error");
- throw new IOException("block move is failed");
+ throw new IOException("block move is failed: " +
+ response.getMessage());
}
}
@@ -823,7 +824,7 @@ public class Balancer {
cluster.add(datanode);
BalancerDatanode datanodeS;
final double avg = policy.getAvgUtilization();
- if (policy.getUtilization(datanode) > avg) {
+ if (policy.getUtilization(datanode) >= avg) {
datanodeS = new Source(datanode, policy, threshold);
if (isAboveAvgUtilized(datanodeS)) {
this.aboveAvgUtilizedDatanodes.add((Source)datanodeS);
@@ -1261,12 +1262,12 @@ public class Balancer {
return datanode.utilization > (policy.getAvgUtilization()+threshold);
}
- /* Return true if the given datanode is above average utilized
+ /* Return true if the given datanode is above or equal to average utilized
* but not overUtilized */
private boolean isAboveAvgUtilized(BalancerDatanode datanode) {
final double avg = policy.getAvgUtilization();
return (datanode.utilization <= (avg+threshold))
- && (datanode.utilization > avg);
+ && (datanode.utilization >= avg);
}
/* Return true if the given datanode is underUtilized */
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Nov 2 05:34:31 2011
@@ -53,7 +53,7 @@ import org.apache.hadoop.hdfs.security.t
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.INodeFileUnderConstruction;
@@ -72,8 +72,6 @@ import com.google.common.annotations.Vis
/**
* Keeps information related to the blocks stored in the Hadoop cluster.
- * This class is a helper class for {@link FSNamesystem} and requires several
- * methods to be called with lock held on {@link FSNamesystem}.
*/
@InterfaceAudience.Private
public class BlockManager {
@@ -176,15 +174,16 @@ public class BlockManager {
/** for block replicas placement */
private BlockPlacementPolicy blockplacement;
- public BlockManager(FSNamesystem fsn, Configuration conf) throws IOException {
- namesystem = fsn;
- datanodeManager = new DatanodeManager(this, fsn, conf);
+ public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
+ final Configuration conf) throws IOException {
+ this.namesystem = namesystem;
+ datanodeManager = new DatanodeManager(this, namesystem, conf);
heartbeatManager = datanodeManager.getHeartbeatManager();
invalidateBlocks = new InvalidateBlocks(datanodeManager);
blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
blockplacement = BlockPlacementPolicy.getInstance(
- conf, fsn, datanodeManager.getNetworkTopology());
+ conf, stats, datanodeManager.getNetworkTopology());
pendingReplications = new PendingReplicationBlocks(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT) * 1000L);
@@ -2580,7 +2579,7 @@ public class BlockManager {
workFound = this.computeReplicationWork(blocksToProcess);
- // Update FSNamesystemMetrics counters
+ // Update counters
namesystem.writeLock();
try {
this.updateState();
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Nov 2 05:34:31 2011
@@ -50,8 +50,8 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
@@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.util.CyclicIteration;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
@@ -78,7 +79,7 @@ import org.apache.hadoop.util.Reflection
public class DatanodeManager {
static final Log LOG = LogFactory.getLog(DatanodeManager.class);
- private final FSNamesystem namesystem;
+ private final Namesystem namesystem;
private final BlockManager blockManager;
private final HeartbeatManager heartbeatManager;
@@ -124,12 +125,12 @@ public class DatanodeManager {
final int blockInvalidateLimit;
DatanodeManager(final BlockManager blockManager,
- final FSNamesystem namesystem, final Configuration conf
+ final Namesystem namesystem, final Configuration conf
) throws IOException {
this.namesystem = namesystem;
this.blockManager = blockManager;
- this.heartbeatManager = new HeartbeatManager(namesystem, conf);
+ this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
this.hostsReader = new HostsFileReader(
conf.get(DFSConfigKeys.DFS_HOSTS, ""),
@@ -163,7 +164,8 @@ public class DatanodeManager {
private Daemon decommissionthread = null;
void activate(final Configuration conf) {
- this.decommissionthread = new Daemon(new DecommissionManager(namesystem).new Monitor(
+ final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
+ this.decommissionthread = new Daemon(dm.new Monitor(
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT),
conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY,
@@ -859,7 +861,7 @@ public class DatanodeManager {
try {
nodeinfo = getDatanode(nodeReg);
} catch(UnregisteredNodeException e) {
- return new DatanodeCommand[]{DatanodeCommand.REGISTER};
+ return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
// Check if this datanode should actually be shutdown instead.
@@ -869,7 +871,7 @@ public class DatanodeManager {
}
if (nodeinfo == null || !nodeinfo.isAlive) {
- return new DatanodeCommand[]{DatanodeCommand.REGISTER};
+ return new DatanodeCommand[]{RegisterCommand.REGISTER};
}
heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed,
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Wed Nov 2 05:34:31 2011
@@ -56,4 +56,7 @@ public interface DatanodeStatistics {
* The block related entries are set to -1.
*/
public long[] getStats();
+
+ /** @return the expired heartbeats */
+ public int getExpiredHeartbeats();
}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java Wed Nov 2 05:34:31 2011
@@ -23,7 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
/**
* Manage node decommissioning.
@@ -33,10 +33,13 @@ import org.apache.hadoop.hdfs.server.nam
class DecommissionManager {
static final Log LOG = LogFactory.getLog(DecommissionManager.class);
- private final FSNamesystem fsnamesystem;
+ private final Namesystem namesystem;
+ private final BlockManager blockmanager;
- DecommissionManager(final FSNamesystem namesystem) {
- this.fsnamesystem = namesystem;
+ DecommissionManager(final Namesystem namesystem,
+ final BlockManager blockmanager) {
+ this.namesystem = namesystem;
+ this.blockmanager = blockmanager;
}
/** Periodically check decommission status. */
@@ -61,12 +64,12 @@ class DecommissionManager {
*/
@Override
public void run() {
- for(; fsnamesystem.isRunning(); ) {
- fsnamesystem.writeLock();
+ for(; namesystem.isRunning(); ) {
+ namesystem.writeLock();
try {
check();
} finally {
- fsnamesystem.writeUnlock();
+ namesystem.writeUnlock();
}
try {
@@ -78,7 +81,7 @@ class DecommissionManager {
}
private void check() {
- final DatanodeManager dm = fsnamesystem.getBlockManager().getDatanodeManager();
+ final DatanodeManager dm = blockmanager.getDatanodeManager();
int count = 0;
for(Map.Entry<String, DatanodeDescriptor> entry
: dm.getDatanodeCyclicIteration(firstkey)) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Wed Nov 2 05:34:31 2011
@@ -27,7 +27,7 @@ import org.apache.hadoop.hdfs.DFSConfigK
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.common.Util;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
import org.apache.hadoop.util.Daemon;
/**
@@ -55,14 +55,17 @@ class HeartbeatManager implements Datano
/** Heartbeat monitor thread */
private final Daemon heartbeatThread = new Daemon(new Monitor());
- final FSNamesystem namesystem;
+ final Namesystem namesystem;
+ final BlockManager blockManager;
- HeartbeatManager(final FSNamesystem namesystem, final Configuration conf) {
+ HeartbeatManager(final Namesystem namesystem, final BlockManager blockManager,
+ final Configuration conf) {
this.heartbeatRecheckInterval = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT); // 5 minutes
this.namesystem = namesystem;
+ this.blockManager = blockManager;
}
void activate(Configuration conf) {
@@ -136,6 +139,11 @@ class HeartbeatManager implements Datano
getBlockPoolUsed()};
}
+ @Override
+ public synchronized int getExpiredHeartbeats() {
+ return stats.expiredHeartbeats;
+ }
+
synchronized void register(final DatanodeDescriptor d) {
if (!datanodes.contains(d)) {
addDatanode(d);
@@ -191,7 +199,7 @@ class HeartbeatManager implements Datano
* effect causes more datanodes to be declared dead.
*/
void heartbeatCheck() {
- final DatanodeManager dm = namesystem.getBlockManager().getDatanodeManager();
+ final DatanodeManager dm = blockManager.getDatanodeManager();
// It's OK to check safe mode w/o taking the lock here, we re-check
// for safe mode after taking the lock before removing a datanode.
if (namesystem.isInSafeMode()) {
@@ -204,7 +212,7 @@ class HeartbeatManager implements Datano
synchronized(this) {
for (DatanodeDescriptor d : datanodes) {
if (dm.isDatanodeDead(d)) {
- namesystem.incrExpiredHeartbeats();
+ stats.incrExpiredHeartbeats();
dead = d;
break;
}
@@ -244,8 +252,7 @@ class HeartbeatManager implements Datano
heartbeatCheck();
lastHeartbeatCheck = now;
}
- if (namesystem.getBlockManager().shouldUpdateBlockKey(
- now - lastBlockKeyUpdate)) {
+ if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
synchronized(HeartbeatManager.this) {
for(DatanodeDescriptor d : datanodes) {
d.needKeyUpdate = true;
@@ -274,6 +281,8 @@ class HeartbeatManager implements Datano
private long blockPoolUsed = 0L;
private int xceiverCount = 0;
+ private int expiredHeartbeats = 0;
+
private void add(final DatanodeDescriptor node) {
capacityUsed += node.getDfsUsed();
blockPoolUsed += node.getBlockPoolUsed();
@@ -297,5 +306,10 @@ class HeartbeatManager implements Datano
capacityTotal -= node.getDfsUsed();
}
}
+
+ /** Increment expired heartbeat counter. */
+ private void incrExpiredHeartbeats() {
+ expiredHeartbeats++;
+ }
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java Wed Nov 2 05:34:31 2011
@@ -26,19 +26,66 @@ import java.util.TreeSet;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
-/** Keep track of under replication blocks.
- * Blocks have replication priority, with priority 0 indicating the highest
- * Blocks have only one replicas has the highest
+/**
+ * Keep prioritized queues of under replicated blocks.
+ * Blocks have replication priority, with priority {@link #QUEUE_HIGHEST_PRIORITY}
+ * indicating the highest priority.
+ * </p>
+ * Having a prioritised queue allows the {@link BlockManager} to select
+ * which blocks to replicate first -it tries to give priority to data
+ * that is most at risk or considered most valuable.
+ *
+ * <p/>
+ * The policy for choosing which priority to give added blocks
+ * is implemented in {@link #getPriority(Block, int, int, int)}.
+ * </p>
+ * <p>The queue order is as follows:</p>
+ * <ol>
+ * <li>{@link #QUEUE_HIGHEST_PRIORITY}: the blocks that must be replicated
+ * first. That is blocks with only one copy, or blocks with zero live
+ * copies but a copy in a node being decommissioned. These blocks
+ * are at risk of loss if the disk or server on which they
+ * remain fails.</li>
+ * <li>{@link #QUEUE_VERY_UNDER_REPLICATED}: blocks that are very
+ * under-replicated compared to their expected values. Currently
+ * that means the ratio of the ratio of actual:expected means that
+ * there is <i>less than</i> 1:3.</li>. These blocks may not be at risk,
+ * but they are clearly considered "important".
+ * <li>{@link #QUEUE_UNDER_REPLICATED}: blocks that are also under
+ * replicated, and the ratio of actual:expected is good enough that
+ * they do not need to go into the {@link #QUEUE_VERY_UNDER_REPLICATED}
+ * queue.</li>
+ * <li>{@link #QUEUE_REPLICAS_BADLY_DISTRIBUTED}: there are as least as
+ * many copies of a block as required, but the blocks are not adequately
+ * distributed. Loss of a rack/switch could take all copies off-line.</li>
+ * <li>{@link #QUEUE_WITH_CORRUPT_BLOCKS} This is for blocks that are corrupt
+ * and for which there are no-non-corrupt copies (currently) available.
+ * The policy here is to keep those corrupt blocks replicated, but give
+ * blocks that are not corrupt higher priority.</li>
+ * </ol>
*/
class UnderReplicatedBlocks implements Iterable<Block> {
+ /** The total number of queues : {@value} */
static final int LEVEL = 5;
+ /** The queue with the highest priority: {@value} */
+ static final int QUEUE_HIGHEST_PRIORITY = 0;
+ /** The queue for blocks that are way below their expected value : {@value} */
+ static final int QUEUE_VERY_UNDER_REPLICATED = 1;
+ /** The queue for "normally" under-replicated blocks: {@value} */
+ static final int QUEUE_UNDER_REPLICATED = 2;
+ /** The queue for blocks that have the right number of replicas,
+ * but which the block manager felt were badly distributed: {@value}
+ */
+ static final int QUEUE_REPLICAS_BADLY_DISTRIBUTED = 3;
+ /** The queue for corrupt blocks: {@value} */
static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
+ /** the queues themselves */
private final List<NavigableSet<Block>> priorityQueues
- = new ArrayList<NavigableSet<Block>>();
-
+ = new ArrayList<NavigableSet<Block>>(LEVEL);
+
/** Create an object. */
UnderReplicatedBlocks() {
- for(int i=0; i<LEVEL; i++) {
+ for (int i = 0; i < LEVEL; i++) {
priorityQueues.add(new TreeSet<Block>());
}
}
@@ -47,7 +94,7 @@ class UnderReplicatedBlocks implements I
* Empty the queues.
*/
void clear() {
- for(int i=0; i<LEVEL; i++) {
+ for (int i = 0; i < LEVEL; i++) {
priorityQueues.get(i).clear();
}
}
@@ -55,7 +102,7 @@ class UnderReplicatedBlocks implements I
/** Return the total number of under replication blocks */
synchronized int size() {
int size = 0;
- for (int i=0; i<LEVEL; i++) {
+ for (int i = 0; i < LEVEL; i++) {
size += priorityQueues.get(i).size();
}
return size;
@@ -64,12 +111,14 @@ class UnderReplicatedBlocks implements I
/** Return the number of under replication blocks excluding corrupt blocks */
synchronized int getUnderReplicatedBlockCount() {
int size = 0;
- for (int i=0; i<QUEUE_WITH_CORRUPT_BLOCKS; i++) {
- size += priorityQueues.get(i).size();
+ for (int i = 0; i < LEVEL; i++) {
+ if (i != QUEUE_WITH_CORRUPT_BLOCKS) {
+ size += priorityQueues.get(i).size();
+ }
}
return size;
}
-
+
/** Return the number of corrupt blocks */
synchronized int getCorruptBlockSize() {
return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
@@ -77,47 +126,58 @@ class UnderReplicatedBlocks implements I
/** Check if a block is in the neededReplication queue */
synchronized boolean contains(Block block) {
- for(NavigableSet<Block> set : priorityQueues) {
- if(set.contains(block)) { return true; }
+ for (NavigableSet<Block> set : priorityQueues) {
+ if (set.contains(block)) {
+ return true;
+ }
}
return false;
}
-
+
/** Return the priority of a block
- * @param block a under replication block
+ * @param block a under replicated block
* @param curReplicas current number of replicas of the block
* @param expectedReplicas expected number of replicas of the block
+ * @return the priority for the blocks, between 0 and ({@link #LEVEL}-1)
*/
- private int getPriority(Block block,
+ private int getPriority(Block block,
int curReplicas,
int decommissionedReplicas,
int expectedReplicas) {
assert curReplicas >= 0 : "Negative replicas!";
if (curReplicas >= expectedReplicas) {
- return 3; // Block doesn't have enough racks
- } else if(curReplicas==0) {
- // If there are zero non-decommissioned replica but there are
+ // Block has enough copies, but not enough racks
+ return QUEUE_REPLICAS_BADLY_DISTRIBUTED;
+ } else if (curReplicas == 0) {
+ // If there are zero non-decommissioned replicas but there are
// some decommissioned replicas, then assign them highest priority
if (decommissionedReplicas > 0) {
- return 0;
+ return QUEUE_HIGHEST_PRIORITY;
}
- return QUEUE_WITH_CORRUPT_BLOCKS; // keep these blocks in needed replication.
- } else if(curReplicas==1) {
- return 0; // highest priority
- } else if(curReplicas*3<expectedReplicas) {
- return 1;
+ //all we have are corrupt blocks
+ return QUEUE_WITH_CORRUPT_BLOCKS;
+ } else if (curReplicas == 1) {
+ //only on replica -risk of loss
+ // highest priority
+ return QUEUE_HIGHEST_PRIORITY;
+ } else if ((curReplicas * 3) < expectedReplicas) {
+ //there is less than a third as many blocks as requested;
+ //this is considered very under-replicated
+ return QUEUE_VERY_UNDER_REPLICATED;
} else {
- return 2;
+ //add to the normal queue for under replicated blocks
+ return QUEUE_UNDER_REPLICATED;
}
}
-
+
/** add a block to a under replication queue according to its priority
* @param block a under replication block
* @param curReplicas current number of replicas of the block
+ * @param decomissionedReplicas the number of decommissioned replicas
* @param expectedReplicas expected number of replicas of the block
+ * @return true if the block was added to a queue.
*/
- synchronized boolean add(
- Block block,
+ synchronized boolean add(Block block,
int curReplicas,
int decomissionedReplicas,
int expectedReplicas) {
@@ -129,7 +189,7 @@ class UnderReplicatedBlocks implements I
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.add:"
+ block
- + " has only "+curReplicas
+ + " has only " + curReplicas
+ " replicas and need " + expectedReplicas
+ " replicas so is added to neededReplications"
+ " at priority level " + priLevel);
@@ -149,8 +209,22 @@ class UnderReplicatedBlocks implements I
oldExpectedReplicas);
return remove(block, priLevel);
}
-
- /** remove a block from a under replication queue given a priority*/
+
+ /**
+ * Remove a block from the under replication queues.
+ *
+ * The priLevel parameter is a hint of which queue to query
+ * first: if negative or >= {@link #LEVEL} this shortcutting
+ * is not attmpted.
+ *
+ * If the block is not found in the nominated queue, an attempt is made to
+ * remove it from all queues.
+ *
+ * <i>Warning:</i> This is not a synchronized method.
+ * @param block block to remove
+ * @param priLevel expected privilege level
+ * @return true if the block was found and removed from one of the priority queues
+ */
boolean remove(Block block, int priLevel) {
if(priLevel >= 0 && priLevel < LEVEL
&& priorityQueues.get(priLevel).remove(block)) {
@@ -164,8 +238,8 @@ class UnderReplicatedBlocks implements I
} else {
// Try to remove the block from all queues if the block was
// not found in the queue for the given priority level.
- for(int i=0; i<LEVEL; i++) {
- if(priorityQueues.get(i).remove(block)) {
+ for (int i = 0; i < LEVEL; i++) {
+ if (priorityQueues.get(i).remove(block)) {
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.remove: "
@@ -178,9 +252,24 @@ class UnderReplicatedBlocks implements I
}
return false;
}
-
- /** update the priority level of a block */
- synchronized void update(Block block, int curReplicas,
+
+ /**
+ * Recalculate and potentially update the priority level of a block.
+ *
+ * If the block priority has changed from before an attempt is made to
+ * remove it from the block queue. Regardless of whether or not the block
+ * is in the block queue of (recalculate) priority, an attempt is made
+ * to add it to that queue. This ensures that the block will be
+ * in its expected priority queue (and only that queue) by the end of the
+ * method call.
+ * @param block a under replicated block
+ * @param curReplicas current number of replicas of the block
+ * @param decommissionedReplicas the number of decommissioned replicas
+ * @param curExpectedReplicas expected number of replicas of the block
+ * @param curReplicasDelta the change in the replicate count from before
+ * @param expectedReplicasDelta the change in the expected replica count from before
+ */
+ synchronized void update(Block block, int curReplicas,
int decommissionedReplicas,
int curExpectedReplicas,
int curReplicasDelta, int expectedReplicasDelta) {
@@ -206,7 +295,7 @@ class UnderReplicatedBlocks implements I
NameNode.stateChangeLog.debug(
"BLOCK* NameSystem.UnderReplicationBlock.update:"
+ block
- + " has only "+curReplicas
+ + " has only "+ curReplicas
+ " replicas and needs " + curExpectedReplicas
+ " replicas so is added to neededReplications"
+ " at priority level " + curPri);
@@ -218,17 +307,24 @@ class UnderReplicatedBlocks implements I
synchronized BlockIterator iterator(int level) {
return new BlockIterator(level);
}
-
+
/** return an iterator of all the under replication blocks */
+ @Override
public synchronized BlockIterator iterator() {
return new BlockIterator();
}
-
+
+ /**
+ * An iterator over blocks.
+ */
class BlockIterator implements Iterator<Block> {
private int level;
private boolean isIteratorForLevel = false;
private List<Iterator<Block>> iterators = new ArrayList<Iterator<Block>>();
+ /**
+ * Construct an iterator over all queues.
+ */
private BlockIterator() {
level=0;
for(int i=0; i<LEVEL; i++) {
@@ -236,6 +332,10 @@ class UnderReplicatedBlocks implements I
}
}
+ /**
+ * Constrict an iterator for a single queue level
+ * @param l the priority level to iterate over
+ */
private BlockIterator(int l) {
level = l;
isIteratorForLevel = true;
@@ -243,8 +343,9 @@ class UnderReplicatedBlocks implements I
}
private void update() {
- if (isIteratorForLevel)
+ if (isIteratorForLevel) {
return;
+ }
while(level< LEVEL-1 && !iterators.get(level).hasNext()) {
level++;
}
@@ -252,30 +353,33 @@ class UnderReplicatedBlocks implements I
@Override
public Block next() {
- if (isIteratorForLevel)
+ if (isIteratorForLevel) {
return iterators.get(0).next();
+ }
update();
return iterators.get(level).next();
}
@Override
public boolean hasNext() {
- if (isIteratorForLevel)
+ if (isIteratorForLevel) {
return iterators.get(0).hasNext();
+ }
update();
return iterators.get(level).hasNext();
}
@Override
public void remove() {
- if (isIteratorForLevel)
+ if (isIteratorForLevel) {
iterators.get(0).remove();
- else
+ } else {
iterators.get(level).remove();
+ }
}
int getPriority() {
return level;
}
- }
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java Wed Nov 2 05:34:31 2011
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
import org.apache.hadoop.hdfs.web.resources.UserParam;
@@ -552,6 +553,13 @@ public class JspHelper {
DataInputStream in = new DataInputStream(buf);
DelegationTokenIdentifier id = new DelegationTokenIdentifier();
id.readFields(in);
+ if (context != null) {
+ final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
+ if (nn != null) {
+ // Verify the token.
+ nn.getNamesystem().verifyToken(id, token.getPassword());
+ }
+ }
ugi = id.getUser();
checkUsername(ugi.getShortUserName(), usernameFromQuery);
checkUsername(ugi.getShortUserName(), user);
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java Wed Nov 2 05:34:31 2011
@@ -403,8 +403,8 @@ class BlockPoolSliceScanner {
try {
adjustThrottler();
- blockSender = new BlockSender(block, 0, -1, false, false, true,
- datanode, null);
+ blockSender = new BlockSender(block, 0, -1, false, true, datanode,
+ null);
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Nov 2 05:34:31 2011
@@ -24,6 +24,7 @@ import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
+import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.PureJavaCrc32;
@@ -57,10 +59,13 @@ import org.apache.hadoop.util.PureJavaCr
class BlockReceiver implements Closeable {
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
+
+ private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
private DataInputStream in = null; // from where data are read
private DataChecksum checksum; // from where chunks of a block can be read
private OutputStream out = null; // to block file at local disk
+ private FileDescriptor outFd;
private OutputStream cout = null; // output stream for cehcksum file
private DataOutputStream checksumOut = null; // to crc file at local disk
private int bytesPerChecksum;
@@ -80,6 +85,11 @@ class BlockReceiver implements Closeable
private final DataNode datanode;
volatile private boolean mirrorError;
+ // Cache management state
+ private boolean dropCacheBehindWrites;
+ private boolean syncBehindWrites;
+ private long lastCacheDropOffset = 0;
+
/** The client name. It is empty if a datanode is the client */
private final String clientname;
private final boolean isClient;
@@ -98,7 +108,8 @@ class BlockReceiver implements Closeable
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode,
- final DataNode datanode) throws IOException {
+ final DataNode datanode, DataChecksum requestedChecksum)
+ throws IOException {
try{
this.block = block;
this.in = in;
@@ -167,9 +178,11 @@ class BlockReceiver implements Closeable
}
}
// read checksum meta information
- this.checksum = DataChecksum.newDataChecksum(in);
+ this.checksum = requestedChecksum;
this.bytesPerChecksum = checksum.getBytesPerChecksum();
this.checksumSize = checksum.getChecksumSize();
+ this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
+ this.syncBehindWrites = datanode.shouldSyncBehindWrites();
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
@@ -177,6 +190,12 @@ class BlockReceiver implements Closeable
this.bytesPerChecksum, this.checksumSize);
if (streams != null) {
this.out = streams.dataOut;
+ if (out instanceof FileOutputStream) {
+ this.outFd = ((FileOutputStream)out).getFD();
+ } else {
+ LOG.warn("Could not get file descriptor for outputstream of class " +
+ out.getClass());
+ }
this.cout = streams.checksumOut;
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
@@ -631,6 +650,8 @@ class BlockReceiver implements Closeable
);
datanode.metrics.incrBytesWritten(len);
+
+ dropOsCacheBehindWriter(offsetInBlock);
}
} catch (IOException iex) {
datanode.checkDiskError(iex);
@@ -645,10 +666,27 @@ class BlockReceiver implements Closeable
return lastPacketInBlock?-1:len;
}
- void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
- checksum.writeHeader(mirrorOut);
+ private void dropOsCacheBehindWriter(long offsetInBlock) throws IOException {
+ try {
+ if (outFd != null &&
+ offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) {
+ long twoWindowsAgo = lastCacheDropOffset - CACHE_DROP_LAG_BYTES;
+ if (twoWindowsAgo > 0 && dropCacheBehindWrites) {
+ NativeIO.posixFadviseIfPossible(outFd, 0, lastCacheDropOffset,
+ NativeIO.POSIX_FADV_DONTNEED);
+ }
+
+ if (syncBehindWrites) {
+ NativeIO.syncFileRangeIfPossible(outFd, lastCacheDropOffset, CACHE_DROP_LAG_BYTES,
+ NativeIO.SYNC_FILE_RANGE_WRITE);
+ }
+
+ lastCacheDropOffset += CACHE_DROP_LAG_BYTES;
+ }
+ } catch (Throwable t) {
+ LOG.warn("Couldn't drop os cache behind writer for " + block, t);
+ }
}
-
void receiveBlock(
DataOutputStream mirrOut, // output to next datanode
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Wed Nov 2 05:34:31 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.da
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
+import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -36,6 +37,9 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;
@@ -118,7 +122,9 @@ class BlockSender implements java.io.Clo
private DataInputStream checksumIn;
/** Checksum utility */
private final DataChecksum checksum;
- /** Starting position to read */
+ /** Initial position to read */
+ private long initialOffset;
+ /** Current position of read */
private long offset;
/** Position of last byte to read from block file */
private final long endOffset;
@@ -128,8 +134,6 @@ class BlockSender implements java.io.Clo
private final int checksumSize;
/** If true, failure to read checksum is ignored */
private final boolean corruptChecksumOk;
- /** true if chunk offset is needed to be sent in Checksum header */
- private final boolean chunkOffsetOK;
/** Sequence number of packet being sent */
private long seqno;
/** Set to true if transferTo is allowed for sending data to the client */
@@ -142,6 +146,24 @@ class BlockSender implements java.io.Clo
private final String clientTraceFmt;
private volatile ChunkChecksum lastChunkChecksum = null;
+ /** The file descriptor of the block being sent */
+ private FileDescriptor blockInFd;
+
+ // Cache-management related fields
+ private final long readaheadLength;
+ private boolean shouldDropCacheBehindRead;
+ private ReadaheadRequest curReadahead;
+ private long lastCacheDropOffset;
+ private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
+ /**
+ * Minimum length of read below which management of the OS
+ * buffer cache is disabled.
+ */
+ private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
+
+ private static ReadaheadPool readaheadPool =
+ ReadaheadPool.getInstance();
+
/**
* Constructor
*
@@ -149,22 +171,22 @@ class BlockSender implements java.io.Clo
* @param startOffset starting offset to read from
* @param length length of data to read
* @param corruptChecksumOk
- * @param chunkOffsetOK need to send check offset in checksum header
* @param verifyChecksum verify checksum while reading the data
* @param datanode datanode from which the block is being read
* @param clientTraceFmt format string used to print client trace logs
* @throws IOException
*/
BlockSender(ExtendedBlock block, long startOffset, long length,
- boolean corruptChecksumOk, boolean chunkOffsetOK,
- boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
+ boolean corruptChecksumOk, boolean verifyChecksum,
+ DataNode datanode, String clientTraceFmt)
throws IOException {
try {
this.block = block;
- this.chunkOffsetOK = chunkOffsetOK;
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
this.clientTraceFmt = clientTraceFmt;
+ this.readaheadLength = datanode.getReadaheadLength();
+ this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads();
synchronized(datanode.data) {
this.replica = getReplica(block, datanode);
@@ -277,6 +299,11 @@ class BlockSender implements java.io.Clo
DataNode.LOG.debug("replica=" + replica);
}
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
+ if (blockIn instanceof FileInputStream) {
+ blockInFd = ((FileInputStream)blockIn).getFD();
+ } else {
+ blockInFd = null;
+ }
} catch (IOException ioe) {
IOUtils.closeStream(this);
IOUtils.closeStream(blockIn);
@@ -288,6 +315,20 @@ class BlockSender implements java.io.Clo
* close opened files.
*/
public void close() throws IOException {
+ if (blockInFd != null && shouldDropCacheBehindRead) {
+ // drop the last few MB of the file from cache
+ try {
+ NativeIO.posixFadviseIfPossible(
+ blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
+ NativeIO.POSIX_FADV_DONTNEED);
+ } catch (Exception e) {
+ LOG.warn("Unable to drop cache on file close", e);
+ }
+ }
+ if (curReadahead != null) {
+ curReadahead.cancel();
+ }
+
IOException ioe = null;
if(checksumIn!=null) {
try {
@@ -304,6 +345,7 @@ class BlockSender implements java.io.Clo
ioe = e;
}
blockIn = null;
+ blockInFd = null;
}
// throw IOException if there is any
if(ioe!= null) {
@@ -538,14 +580,22 @@ class BlockSender implements java.io.Clo
if (out == null) {
throw new IOException( "out stream is null" );
}
- final long initialOffset = offset;
+ initialOffset = offset;
long totalRead = 0;
OutputStream streamForSendChunks = out;
+ lastCacheDropOffset = initialOffset;
+
+ if (isLongRead() && blockInFd != null) {
+ // Advise that this file descriptor will be accessed sequentially.
+ NativeIO.posixFadviseIfPossible(blockInFd, 0, 0, NativeIO.POSIX_FADV_SEQUENTIAL);
+ }
+
+ // Trigger readahead of beginning of file if configured.
+ manageOsCache();
+
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
try {
- writeChecksumHeader(out);
-
int maxChunksPerPacket;
int pktSize = PacketHeader.PKT_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum
@@ -569,6 +619,7 @@ class BlockSender implements java.io.Clo
ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
while (endOffset > offset) {
+ manageOsCache();
long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
transferTo, throttler);
offset += len;
@@ -595,22 +646,45 @@ class BlockSender implements java.io.Clo
}
return totalRead;
}
-
+
/**
- * Write checksum header to the output stream
+ * Manage the OS buffer cache by performing read-ahead
+ * and drop-behind.
*/
- private void writeChecksumHeader(DataOutputStream out) throws IOException {
- try {
- checksum.writeHeader(out);
- if (chunkOffsetOK) {
- out.writeLong(offset);
+ private void manageOsCache() throws IOException {
+ if (!isLongRead() || blockInFd == null) {
+ // don't manage cache manually for short-reads, like
+ // HBase random read workloads.
+ return;
+ }
+
+ // Perform readahead if necessary
+ if (readaheadLength > 0 && readaheadPool != null) {
+ curReadahead = readaheadPool.readaheadStream(
+ clientTraceFmt, blockInFd,
+ offset, readaheadLength, Long.MAX_VALUE,
+ curReadahead);
+ }
+
+ // Drop what we've just read from cache, since we aren't
+ // likely to need it again
+ long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
+ if (shouldDropCacheBehindRead &&
+ offset >= nextCacheDropOffset) {
+ long dropLength = offset - lastCacheDropOffset;
+ if (dropLength >= 1024) {
+ NativeIO.posixFadviseIfPossible(blockInFd,
+ lastCacheDropOffset, dropLength,
+ NativeIO.POSIX_FADV_DONTNEED);
}
- out.flush();
- } catch (IOException e) { //socket error
- throw ioeToSocketException(e);
+ lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES;
}
}
-
+
+ private boolean isLongRead() {
+ return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
+ }
+
/**
* Write packet header into {@code pkt}
*/
@@ -624,4 +698,19 @@ class BlockSender implements java.io.Clo
boolean didSendEntireByteRange() {
return sentEntireByteRange;
}
+
+ /**
+ * @return the checksum type that will be used with this block transfer.
+ */
+ DataChecksum getChecksum() {
+ return checksum;
+ }
+
+ /**
+ * @return the offset into the block file where the sender is currently
+ * reading.
+ */
+ long getOffset() {
+ return offset;
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Wed Nov 2 05:34:31 2011
@@ -104,6 +104,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -123,6 +124,7 @@ import org.apache.hadoop.hdfs.protocol.d
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeProtocolServerSideTranslatorR23;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -150,12 +152,16 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
+import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
+import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolServerSideTranslatorR23;
+import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeProtocolTranslatorR23;
+import org.apache.hadoop.hdfs.server.protocolR23Compatible.InterDatanodeWireProtocol;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.http.HttpServer;
@@ -413,6 +419,11 @@ public class DataNode extends Configured
int socketTimeout;
int socketWriteTimeout = 0;
boolean transferToAllowed = true;
+ private boolean dropCacheBehindWrites = false;
+ private boolean syncBehindWrites = false;
+ private boolean dropCacheBehindReads = false;
+ private long readaheadLength = 0;
+
int writePacketSize = 0;
boolean isBlockTokenEnabled;
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
@@ -496,6 +507,20 @@ public class DataNode extends Configured
DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT);
this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+
+ this.readaheadLength = conf.getLong(
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
+ DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+ this.dropCacheBehindWrites = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
+ DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);
+ this.syncBehindWrites = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
+ DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
+ this.dropCacheBehindReads = conf.getBoolean(
+ DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
+ DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
+
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
this.initialBlockReportDelay = conf.getLong(
@@ -554,7 +579,7 @@ public class DataNode extends Configured
if (conf.getBoolean(DFS_WEBHDFS_ENABLED_KEY, DFS_WEBHDFS_ENABLED_DEFAULT)) {
infoServer.addJerseyResourcePackage(DatanodeWebHdfsMethods.class
.getPackage().getName() + ";" + Param.class.getPackage().getName(),
- "/" + WebHdfsFileSystem.PATH_PREFIX + "/*");
+ WebHdfsFileSystem.PATH_PREFIX + "/*");
}
this.infoServer.start();
}
@@ -576,13 +601,22 @@ public class DataNode extends Configured
InetSocketAddress ipcAddr = NetUtils.createSocketAddr(
conf.get("dfs.datanode.ipc.address"));
- // Add all the RPC protocols that the Datanode implements
- ipcServer = RPC.getServer(ClientDatanodeProtocol.class, this, ipcAddr.getHostName(),
+ // Add all the RPC protocols that the Datanode implements
+ ClientDatanodeProtocolServerSideTranslatorR23
+ clientDatanodeProtocolServerTranslator =
+ new ClientDatanodeProtocolServerSideTranslatorR23(this);
+ ipcServer = RPC.getServer(
+ org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol.class,
+ clientDatanodeProtocolServerTranslator, ipcAddr.getHostName(),
ipcAddr.getPort(),
conf.getInt(DFS_DATANODE_HANDLER_COUNT_KEY,
DFS_DATANODE_HANDLER_COUNT_DEFAULT),
false, conf, blockPoolTokenSecretManager);
- ipcServer.addProtocol(InterDatanodeProtocol.class, this);
+ InterDatanodeProtocolServerSideTranslatorR23
+ interDatanodeProtocolServerTranslator =
+ new InterDatanodeProtocolServerSideTranslatorR23(this);
+ ipcServer.addProtocol(InterDatanodeWireProtocol.class,
+ interDatanodeProtocolServerTranslator);
// set service-level authorization security policy
if (conf.getBoolean(
@@ -1137,8 +1171,15 @@ public class DataNode extends Configured
if (!heartbeatsDisabledForTests) {
DatanodeCommand[] cmds = sendHeartBeat();
metrics.addHeartbeat(now() - startTime);
+
+ long startProcessCommands = now();
if (!processCommand(cmds))
continue;
+ long endProcessCommands = now();
+ if (endProcessCommands - startProcessCommands > 2000) {
+ LOG.info("Took " + (endProcessCommands - startProcessCommands) +
+ "ms to process " + cmds.length + " commands from NN");
+ }
}
}
if (pendingReceivedRequests > 0
@@ -1412,7 +1453,7 @@ public class DataNode extends Configured
}
break;
case DatanodeProtocol.DNA_FINALIZE:
- storage.finalizeUpgrade(((DatanodeCommand.Finalize) cmd)
+ storage.finalizeUpgrade(((FinalizeCommand) cmd)
.getBlockPoolId());
break;
case UpgradeCommand.UC_ACTION_START_UPGRADE:
@@ -1634,15 +1675,13 @@ public class DataNode extends Configured
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
InterDatanodeProtocol.LOG.debug("InterDatanodeProtocol addr=" + addr);
}
- UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
+ final UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
try {
return loginUgi
.doAs(new PrivilegedExceptionAction<InterDatanodeProtocol>() {
public InterDatanodeProtocol run() throws IOException {
- return (InterDatanodeProtocol) RPC.getProxy(
- InterDatanodeProtocol.class, InterDatanodeProtocol.versionID,
- addr, UserGroupInformation.getCurrentUser(), conf,
- NetUtils.getDefaultSocketFactory(conf), socketTimeout);
+ return new InterDatanodeProtocolTranslatorR23(addr, loginUgi,
+ conf, NetUtils.getDefaultSocketFactory(conf), socketTimeout);
}
});
} catch (InterruptedException ie) {
@@ -1878,7 +1917,7 @@ public class DataNode extends Configured
nn.reportBadBlocks(new LocatedBlock[]{
new LocatedBlock(block, new DatanodeInfo[] {
new DatanodeInfo(bpReg)})});
- LOG.info("Can't replicate block " + block
+ LOG.warn("Can't replicate block " + block
+ " because on-disk length " + onDiskLength
+ " is shorter than NameNode recorded length " + block.getNumBytes());
return;
@@ -2058,7 +2097,7 @@ public class DataNode extends Configured
out = new DataOutputStream(new BufferedOutputStream(baseStream,
HdfsConstants.SMALL_BUFFER_SIZE));
blockSender = new BlockSender(b, 0, b.getNumBytes(),
- false, false, false, DataNode.this, null);
+ false, false, DataNode.this, null);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
//
@@ -2071,7 +2110,7 @@ public class DataNode extends Configured
}
new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
- stage, 0, 0, 0, 0);
+ stage, 0, 0, 0, 0, blockSender.getChecksum());
// send data & checksum
blockSender.sendBlock(out, baseStream, null);
@@ -2884,4 +2923,20 @@ public class DataNode extends Configured
(DataXceiverServer) this.dataXceiverServer.getRunnable();
return dxcs.balanceThrottler.getBandwidth();
}
+
+ long getReadaheadLength() {
+ return readaheadLength;
+ }
+
+ boolean shouldDropCacheBehindWrites() {
+ return dropCacheBehindWrites;
+ }
+
+ boolean shouldDropCacheBehindReads() {
+ return dropCacheBehindReads;
+ }
+
+ boolean shouldSyncBehindWrites() {
+ return syncBehindWrites;
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Wed Nov 2 05:34:31 2011
@@ -44,12 +44,16 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto.Builder;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProtoOrBuilder;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
@@ -92,7 +96,6 @@ class DataXceiver extends Receiver imple
this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
this.datanode = datanode;
this.dataXceiverServer = dataXceiverServer;
- dataXceiverServer.childSockets.put(s, s);
remoteAddress = s.getRemoteSocketAddress().toString();
localAddress = s.getLocalSocketAddress().toString();
@@ -129,6 +132,7 @@ class DataXceiver extends Receiver imple
public void run() {
int opsProcessed = 0;
Op op = null;
+ dataXceiverServer.childSockets.put(s, s);
try {
int stdTimeout = s.getSoTimeout();
@@ -223,15 +227,17 @@ class DataXceiver extends Receiver imple
try {
try {
blockSender = new BlockSender(block, blockOffset, length,
- true, true, false, datanode, clientTraceFmt);
+ true, false, datanode, clientTraceFmt);
} catch(IOException e) {
- LOG.info("opReadBlock " + block + " received exception " + e);
- sendResponse(s, ERROR, datanode.socketWriteTimeout);
+ String msg = "opReadBlock " + block + " received exception " + e;
+ LOG.info(msg);
+ sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
throw e;
}
// send op status
- sendResponse(s, SUCCESS, datanode.socketWriteTimeout);
+ writeSuccessWithChecksumInfo(blockSender,
+ getStreamWithTimeout(s, datanode.socketWriteTimeout));
long read = blockSender.sendBlock(out, baseStream, null); // send data
@@ -289,7 +295,8 @@ class DataXceiver extends Receiver imple
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
- final long latestGenerationStamp) throws IOException {
+ final long latestGenerationStamp,
+ DataChecksum requestedChecksum) throws IOException {
updateCurrentThreadName("Receiving block " + block + " client=" + clientname);
final boolean isDatanode = clientname.length() == 0;
final boolean isClient = !isDatanode;
@@ -348,7 +355,7 @@ class DataXceiver extends Receiver imple
s.getRemoteSocketAddress().toString(),
s.getLocalSocketAddress().toString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
- clientname, srcDataNode, datanode);
+ clientname, srcDataNode, datanode, requestedChecksum);
} else {
datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
}
@@ -378,11 +385,8 @@ class DataXceiver extends Receiver imple
new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
clientname, targets, srcDataNode, stage, pipelineSize,
- minBytesRcvd, maxBytesRcvd, latestGenerationStamp);
+ minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum);
- if (blockReceiver != null) { // send checksum header
- blockReceiver.writeChecksumHeader(mirrorOut);
- }
mirrorOut.flush();
// read connect ack (only for clients, not for replication req)
@@ -452,7 +456,7 @@ class DataXceiver extends Receiver imple
if (LOG.isTraceEnabled()) {
LOG.trace("TRANSFER: send close-ack");
}
- writeResponse(SUCCESS, replyOut);
+ writeResponse(SUCCESS, null, replyOut);
}
}
@@ -507,7 +511,7 @@ class DataXceiver extends Receiver imple
NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
try {
datanode.transferReplicaForPipelineRecovery(blk, targets, clientName);
- writeResponse(Status.SUCCESS, out);
+ writeResponse(Status.SUCCESS, null, out);
} finally {
IOUtils.closeStream(out);
}
@@ -577,16 +581,17 @@ class DataXceiver extends Receiver imple
LOG.warn("Invalid access token in request from " + remoteAddress
+ " for OP_COPY_BLOCK for block " + block + " : "
+ e.getLocalizedMessage());
- sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
+ sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token", datanode.socketWriteTimeout);
return;
}
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
- LOG.info("Not able to copy block " + block.getBlockId() + " to "
- + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
- sendResponse(s, ERROR, datanode.socketWriteTimeout);
+ String msg = "Not able to copy block " + block.getBlockId() + " to "
+ + s.getRemoteSocketAddress() + " because threads quota is exceeded.";
+ LOG.info(msg);
+ sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
return;
}
@@ -596,8 +601,8 @@ class DataXceiver extends Receiver imple
try {
// check if the block exists or not
- blockSender = new BlockSender(block, 0, -1, false, false, false,
- datanode, null);
+ blockSender = new BlockSender(block, 0, -1, false, false, datanode,
+ null);
// set up response stream
OutputStream baseStream = NetUtils.getOutputStream(
@@ -606,7 +611,7 @@ class DataXceiver extends Receiver imple
baseStream, HdfsConstants.SMALL_BUFFER_SIZE));
// send status first
- writeResponse(SUCCESS, reply);
+ writeSuccessWithChecksumInfo(blockSender, reply);
// send block content to the target
long read = blockSender.sendBlock(reply, baseStream,
dataXceiverServer.balanceThrottler);
@@ -653,21 +658,24 @@ class DataXceiver extends Receiver imple
LOG.warn("Invalid access token in request from " + remoteAddress
+ " for OP_REPLACE_BLOCK for block " + block + " : "
+ e.getLocalizedMessage());
- sendResponse(s, ERROR_ACCESS_TOKEN, datanode.socketWriteTimeout);
+ sendResponse(s, ERROR_ACCESS_TOKEN, "Invalid access token",
+ datanode.socketWriteTimeout);
return;
}
}
if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
- LOG.warn("Not able to receive block " + block.getBlockId() + " from "
- + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
- sendResponse(s, ERROR, datanode.socketWriteTimeout);
+ String msg = "Not able to receive block " + block.getBlockId() + " from "
+ + s.getRemoteSocketAddress() + " because threads quota is exceeded.";
+ LOG.warn(msg);
+ sendResponse(s, ERROR, msg, datanode.socketWriteTimeout);
return;
}
Socket proxySock = null;
DataOutputStream proxyOut = null;
Status opStatus = SUCCESS;
+ String errMsg = null;
BlockReceiver blockReceiver = null;
DataInputStream proxyReply = null;
@@ -702,11 +710,16 @@ class DataXceiver extends Receiver imple
throw new IOException("Copy block " + block + " from "
+ proxySock.getRemoteSocketAddress() + " failed");
}
+
+ // get checksum info about the block we're copying
+ ReadOpChecksumInfoProto checksumInfo = copyResponse.getReadOpChecksumInfo();
+ DataChecksum remoteChecksum = DataTransferProtoUtil.fromProto(
+ checksumInfo.getChecksum());
// open a block receiver and check if the block does not exist
blockReceiver = new BlockReceiver(
block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
- null, 0, 0, 0, "", null, datanode);
+ null, 0, 0, 0, "", null, datanode, remoteChecksum);
// receive a block
blockReceiver.receiveBlock(null, null, null, null,
@@ -720,7 +733,8 @@ class DataXceiver extends Receiver imple
} catch (IOException ioe) {
opStatus = ERROR;
- LOG.info("opReplaceBlock " + block + " received exception " + ioe);
+ errMsg = "opReplaceBlock " + block + " received exception " + ioe;
+ LOG.info(errMsg);
throw ioe;
} finally {
// receive the last byte that indicates the proxy released its thread resource
@@ -736,7 +750,7 @@ class DataXceiver extends Receiver imple
// send response back
try {
- sendResponse(s, opStatus, datanode.socketWriteTimeout);
+ sendResponse(s, opStatus, errMsg, datanode.socketWriteTimeout);
} catch (IOException ioe) {
LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
}
@@ -759,20 +773,41 @@ class DataXceiver extends Receiver imple
* @param opStatus status message to write
* @param timeout send timeout
**/
- private void sendResponse(Socket s, Status status,
+ private static void sendResponse(Socket s, Status status, String message,
long timeout) throws IOException {
- DataOutputStream reply =
- new DataOutputStream(NetUtils.getOutputStream(s, timeout));
+ DataOutputStream reply = getStreamWithTimeout(s, timeout);
- writeResponse(status, reply);
+ writeResponse(status, message, reply);
}
- private void writeResponse(Status status, OutputStream out)
+ private static DataOutputStream getStreamWithTimeout(Socket s, long timeout)
+ throws IOException {
+ return new DataOutputStream(NetUtils.getOutputStream(s, timeout));
+ }
+
+ private static void writeResponse(Status status, String message, OutputStream out)
throws IOException {
+ BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder()
+ .setStatus(status);
+ if (message != null) {
+ response.setMessage(message);
+ }
+ response.build().writeDelimitedTo(out);
+ out.flush();
+ }
+
+ private void writeSuccessWithChecksumInfo(BlockSender blockSender,
+ DataOutputStream out) throws IOException {
+
+ ReadOpChecksumInfoProto ckInfo = ReadOpChecksumInfoProto.newBuilder()
+ .setChecksum(DataTransferProtoUtil.toProto(blockSender.getChecksum()))
+ .setChunkOffset(blockSender.getOffset())
+ .build();
+
BlockOpResponseProto response = BlockOpResponseProto.newBuilder()
- .setStatus(status)
+ .setStatus(SUCCESS)
+ .setReadOpChecksumInfo(ckInfo)
.build();
-
response.writeDelimitedTo(out);
out.flush();
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Wed Nov 2 05:34:31 2011
@@ -30,7 +30,6 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.balancer.Balancer;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
@@ -132,17 +131,12 @@ class DataXceiverServer implements Runna
@Override
public void run() {
while (datanode.shouldRun) {
+ Socket s = null;
try {
- Socket s = ss.accept();
+ s = ss.accept();
s.setTcpNoDelay(true);
- final DataXceiver exciver;
- try {
- exciver = new DataXceiver(s, datanode, this);
- } catch(IOException e) {
- IOUtils.closeSocket(s);
- throw e;
- }
- new Daemon(datanode.threadGroup, exciver).start();
+ new Daemon(datanode.threadGroup, new DataXceiver(s, datanode, this))
+ .start();
} catch (SocketTimeoutException ignored) {
// wake up to see if should continue to run
} catch (AsynchronousCloseException ace) {
@@ -152,7 +146,19 @@ class DataXceiverServer implements Runna
LOG.warn(datanode.getMachineName() + ":DataXceiverServer: ", ace);
}
} catch (IOException ie) {
+ IOUtils.closeSocket(s);
LOG.warn(datanode.getMachineName() + ":DataXceiverServer: ", ie);
+ } catch (OutOfMemoryError ie) {
+ IOUtils.closeSocket(s);
+ // DataNode can run out of memory if there is too many transfers.
+ // Log the event, Sleep for 30 seconds, other transfers may complete by
+ // then.
+ LOG.warn("DataNode is out of memory. Will retry in 30 seconds.", ie);
+ try {
+ Thread.sleep(30 * 1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
} catch (Throwable te) {
LOG.error(datanode.getMachineName()
+ ":DataXceiverServer: Exiting due to: ", te);
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Wed Nov 2 05:34:31 2011
@@ -59,7 +59,6 @@ import org.apache.hadoop.hdfs.server.com
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.util.MBeans;
@@ -1263,8 +1262,8 @@ public class FSDataset implements FSData
throws IOException {
File f = validateBlockFile(bpid, b);
if(f == null) {
- if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
- InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
}
throw new IOException("Block " + b + " is not valid.");
}
@@ -2003,8 +2002,8 @@ public class FSDataset implements FSData
datanode.checkDiskError();
}
- if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
- InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("b=" + b + ", f=" + f);
}
return null;
}
@@ -2088,10 +2087,9 @@ public class FSDataset implements FSData
volumeMap.remove(bpid, invalidBlks[i]);
}
File metaFile = getMetaFile(f, invalidBlks[i].getGenerationStamp());
- long dfsBytes = f.length() + metaFile.length();
// Delete the block asynchronously to make sure we can do it fast enough
- asyncDiskService.deleteAsync(v, f, metaFile, dfsBytes,
+ asyncDiskService.deleteAsync(v, f, metaFile,
new ExtendedBlock(bpid, invalidBlks[i]));
}
if (error) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetAsyncDiskService.java Wed Nov 2 05:34:31 2011
@@ -152,11 +152,11 @@ class FSDatasetAsyncDiskService {
* dfsUsed statistics accordingly.
*/
void deleteAsync(FSDataset.FSVolume volume, File blockFile, File metaFile,
- long dfsBytes, ExtendedBlock block) {
+ ExtendedBlock block) {
DataNode.LOG.info("Scheduling block " + block.getLocalBlock().toString()
+ " file " + blockFile + " for deletion");
ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(dataset,
- volume, blockFile, metaFile, dfsBytes, block);
+ volume, blockFile, metaFile, block);
execute(volume.getCurrentDir(), deletionTask);
}
@@ -168,16 +168,14 @@ class FSDatasetAsyncDiskService {
final FSDataset.FSVolume volume;
final File blockFile;
final File metaFile;
- final long dfsBytes;
final ExtendedBlock block;
ReplicaFileDeleteTask(FSDataset dataset, FSDataset.FSVolume volume, File blockFile,
- File metaFile, long dfsBytes, ExtendedBlock block) {
+ File metaFile, ExtendedBlock block) {
this.dataset = dataset;
this.volume = volume;
this.blockFile = blockFile;
this.metaFile = metaFile;
- this.dfsBytes = dfsBytes;
this.block = block;
}
@@ -195,6 +193,7 @@ class FSDatasetAsyncDiskService {
@Override
public void run() {
+ long dfsBytes = blockFile.length() + metaFile.length();
if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
DataNode.LOG.warn("Unexpected error trying to delete block "
+ block.getBlockPoolId() + " " + block.getLocalBlock().toString()
|