hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject [2/2] hadoop git commit: HDFS-7923. The DataNodes should rate-limit their full block reports by asking the NN on heartbeat messages (cmccabe)
Date Fri, 12 Jun 2015 18:36:37 GMT
HDFS-7923. The DataNodes should rate-limit their full block reports by asking the NN on heartbeat messages (cmccabe)

(cherry picked from commit 12b5b06c063d93e6c683c9b6fac9a96912f59e59)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/378bb484
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/378bb484
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/378bb484

Branch: refs/heads/branch-2
Commit: 378bb484bbcb1921729cf88e718cccfc8986a716
Parents: e397cca
Author: Colin Patrick Mccabe <cmccabe@cloudera.com>
Authored: Fri Jun 12 11:17:51 2015 -0700
Committer: Colin Patrick Mccabe <cmccabe@cloudera.com>
Committed: Fri Jun 12 11:29:05 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   4 +
 .../DatanodeProtocolClientSideTranslatorPB.java |   8 +-
 .../DatanodeProtocolServerSideTranslatorPB.java |   3 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |   3 +-
 .../server/blockmanagement/BlockManager.java    |  41 ++-
 .../BlockManagerFaultInjector.java              |  52 +++
 .../BlockReportLeaseManager.java                | 355 +++++++++++++++++++
 .../server/blockmanagement/DatanodeManager.java |   2 +
 .../hdfs/server/datanode/BPServiceActor.java    |  71 +++-
 .../hadoop/hdfs/server/datanode/DNConf.java     |   4 +-
 .../hdfs/server/namenode/FSNamesystem.java      |  11 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |   9 +-
 .../server/protocol/BlockReportContext.java     |  25 +-
 .../hdfs/server/protocol/DatanodeProtocol.java  |   5 +-
 .../hdfs/server/protocol/HeartbeatResponse.java |  10 +-
 .../hdfs/server/protocol/RegisterCommand.java   |   2 +-
 .../src/main/proto/DatanodeProtocol.proto       |   6 +
 .../src/main/resources/hdfs-default.xml         |  21 ++
 .../hdfs/protocol/TestBlockListAsLongs.java     |   4 +-
 .../TestBlockReportRateLimiting.java            | 246 +++++++++++++
 .../blockmanagement/TestDatanodeManager.java    |  21 +-
 .../TestNameNodePrunesMissingStorages.java      |   2 +-
 .../server/datanode/TestBPOfferService.java     |   7 +-
 .../TestBlockHasMultipleReplicasOnSameDN.java   |   2 +-
 .../hdfs/server/datanode/TestBlockRecovery.java |   6 +-
 .../datanode/TestBpServiceActorScheduler.java   |   2 +-
 .../TestDatanodeProtocolRetryPolicy.java        |   8 +-
 .../server/datanode/TestFsDatasetCache.java     |   9 +-
 .../TestNNHandlesBlockReportPerStorage.java     |   2 +-
 .../TestNNHandlesCombinedBlockReport.java       |   2 +-
 .../hdfs/server/datanode/TestStorageReport.java |   2 +-
 .../server/namenode/NNThroughputBenchmark.java  |   8 +-
 .../hdfs/server/namenode/NameNodeAdapter.java   |   2 +-
 .../hdfs/server/namenode/TestDeadDatanode.java  |   6 +-
 35 files changed, 893 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 0368f29..39dee78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -283,6 +283,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-8573. Move creation of restartMeta file logic from BlockReceiver to
     ReplicaInPipeline. (Eddy Xu via wang)
 
+    HDFS-7923. The DataNodes should rate-limit their full block reports by
+    asking the NN on heartbeat messages (cmccabe)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index dc0ecca..5295a2f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -436,6 +436,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final int     DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
   public static final String  DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY = "dfs.blockreport.split.threshold";
   public static final long    DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT = 1000 * 1000;
+  public static final String  DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES = "dfs.namenode.max.full.block.report.leases";
+  public static final int     DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES_DEFAULT = 6;
+  public static final String  DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS = "dfs.namenode.full.block.report.lease.length.ms";
+  public static final long    DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS_DEFAULT = 5L * 60L * 1000L;
   public static final String  DFS_CACHEREPORT_INTERVAL_MSEC_KEY = "dfs.cachereport.intervalMsec";
   public static final long    DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT = 10 * 1000;
   public static final String  DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index 825e835..94028a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -132,11 +132,13 @@ public class DatanodeProtocolClientSideTranslatorPB implements
   public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
       StorageReport[] reports, long cacheCapacity, long cacheUsed,
       int xmitsInProgress, int xceiverCount, int failedVolumes,
-      VolumeFailureSummary volumeFailureSummary) throws IOException {
+      VolumeFailureSummary volumeFailureSummary,
+      boolean requestFullBlockReportLease) throws IOException {
     HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
         .setRegistration(PBHelper.convert(registration))
         .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
-        .setFailedVolumes(failedVolumes);
+        .setFailedVolumes(failedVolumes)
+        .setRequestFullBlockReportLease(requestFullBlockReportLease);
     builder.addAllReports(PBHelper.convertStorageReports(reports));
     if (cacheCapacity != 0) {
       builder.setCacheCapacity(cacheCapacity);
@@ -165,7 +167,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       rollingUpdateStatus = PBHelper.convert(resp.getRollingUpgradeStatus());
     }
     return new HeartbeatResponse(cmds, PBHelper.convert(resp.getHaStatus()),
-        rollingUpdateStatus);
+        rollingUpdateStatus, resp.getFullBlockReportLeaseId());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index 873eb6d..e133ec7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -114,7 +114,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
           report, request.getCacheCapacity(), request.getCacheUsed(),
           request.getXmitsInProgress(),
           request.getXceiverCount(), request.getFailedVolumes(),
-          volumeFailureSummary);
+          volumeFailureSummary, request.getRequestFullBlockReportLease());
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -135,6 +135,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
       builder.setRollingUpgradeStatus(PBHelper
           .convertRollingUpgradeStatus(rollingUpdateStatus));
     }
+    builder.setFullBlockReportLeaseId(response.getFullBlockReportLeaseId());
     return builder.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 97e7f4b..e46a3a5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -3043,7 +3043,7 @@ public class PBHelper {
 
   public static BlockReportContext convert(BlockReportContextProto proto) {
     return new BlockReportContext(proto.getTotalRpcs(),
-        proto.getCurRpc(), proto.getId());
+        proto.getCurRpc(), proto.getId(), proto.getLeaseId());
   }
 
   public static BlockReportContextProto convert(BlockReportContext context) {
@@ -3051,6 +3051,7 @@ public class PBHelper {
         setTotalRpcs(context.getTotalRpcs()).
         setCurRpc(context.getCurRpc()).
         setId(context.getReportId()).
+        setLeaseId(context.getLeaseId()).
         build();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 6f5b485..36b8cc7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
@@ -124,6 +125,7 @@ public class BlockManager {
   private final AtomicLong excessBlocksCount = new AtomicLong(0L);
   private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
   private final long startupDelayBlockDeletionInMs;
+  private final BlockReportLeaseManager blockReportLeaseManager;
 
   /** Used by metrics */
   public long getPendingReplicationBlocksCount() {
@@ -348,7 +350,8 @@ public class BlockManager {
     this.numBlocksPerIteration = conf.getInt(
         DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT,
         DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT);
-    
+    this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
+
     LOG.info("defaultReplication         = " + defaultReplication);
     LOG.info("maxReplication             = " + maxReplication);
     LOG.info("minReplication             = " + minReplication);
@@ -1714,7 +1717,28 @@ public class BlockManager {
        */
     }
   }
-  
+
+  public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) {
+    assert namesystem.hasReadLock();
+    DatanodeDescriptor node = null;
+    try {
+      node = datanodeManager.getDatanode(nodeReg);
+    } catch (UnregisteredNodeException e) {
+      LOG.warn("Unregistered datanode {}", nodeReg);
+      return 0;
+    }
+    if (node == null) {
+      LOG.warn("Failed to find datanode {}", nodeReg);
+      return 0;
+    }
+    // Request a new block report lease.  The BlockReportLeaseManager has
+    // its own internal locking.
+    long leaseId = blockReportLeaseManager.requestLease(node);
+    BlockManagerFaultInjector.getInstance().
+        requestBlockReportLease(node, leaseId);
+    return leaseId;
+  }
+
   /**
    * StatefulBlockInfo is used to build the "toUC" list, which is a list of
    * updates to the information about under-construction blocks.
@@ -1819,6 +1843,12 @@ public class BlockManager {
             + " because namenode still in startup phase", nodeID);
         return !node.hasStaleStorages();
       }
+      if (context != null) {
+        if (!blockReportLeaseManager.checkLease(node, startTime,
+              context.getLeaseId())) {
+          return false;
+        }
+      }
 
       if (storageInfo.getBlockReportCount() == 0) {
         // The first block report can be processed a lot more efficiently than
@@ -1837,6 +1867,9 @@ public class BlockManager {
         if (lastStorageInRpc) {
           int rpcsSeen = node.updateBlockReportContext(context);
           if (rpcsSeen >= context.getTotalRpcs()) {
+            long leaseId = blockReportLeaseManager.removeLease(node);
+            BlockManagerFaultInjector.getInstance().
+                removeBlockReportLease(node, leaseId);
             List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
             if (zombies.isEmpty()) {
               LOG.debug("processReport 0x{}: no zombie storages found.",
@@ -3847,4 +3880,8 @@ public class BlockManager {
     clearQueues();
     blocksMap.clear();
   }
+
+  public BlockReportLeaseManager getBlockReportLeaseManager() {
+    return blockReportLeaseManager;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerFaultInjector.java
new file mode 100644
index 0000000..957c5c0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerFaultInjector.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+
+/**
+ * Used to inject certain faults for testing.
+ */
+public class BlockManagerFaultInjector {
+  @VisibleForTesting
+  public static BlockManagerFaultInjector instance =
+      new BlockManagerFaultInjector();
+
+  @VisibleForTesting
+  public static BlockManagerFaultInjector getInstance() {
+    return instance;
+  }
+
+  @VisibleForTesting
+  public void incomingBlockReportRpc(DatanodeID nodeID,
+          BlockReportContext context) throws IOException {
+
+  }
+
+  @VisibleForTesting
+  public void requestBlockReportLease(DatanodeDescriptor node, long leaseId) {
+  }
+
+  @VisibleForTesting
+  public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
new file mode 100644
index 0000000..cd037f5
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java
@@ -0,0 +1,355 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * The BlockReportLeaseManager manages block report leases.<p/>
+ *
+ * DataNodes request BR leases from the NameNode by sending a heartbeat with
+ * the requestBlockReportLease field set.  The NameNode may choose to respond
+ * with a non-zero lease ID.  If so, that DataNode can send a block report with
+ * the given lease ID for the next few minutes.  The NameNode will accept
+ * these full block reports.<p/>
+ *
+ * BR leases limit the number of incoming full block reports to the NameNode
+ * at any given time.  For compatibility reasons, the NN will always accept
+ * block reports sent with a lease ID of 0 and queue them for processing
+ * immediately.  Full block reports which were manually triggered will also
+ * have a lease ID of 0, bypassing the rate-limiting.<p/>
+ *
+ * Block report leases expire after a certain amount of time.  This mechanism
+ * is in place so that a DN which dies while holding a lease does not
+ * permanently decrease the number of concurrent block reports which the NN is
+ * willing to accept.<p/>
+ *
+ * When considering which DNs to grant a BR lease, the NameNode gives priority
+ * to the DNs which have gone the longest without sending a full block
+ * report.<p/>
+ */
+class BlockReportLeaseManager {
+  static final Logger LOG =
+      LoggerFactory.getLogger(BlockReportLeaseManager.class);
+
+  private static class NodeData {
+    /**
+     * The UUID of the datanode.
+     */
+    final String datanodeUuid;
+
+    /**
+     * The lease ID, or 0 if there is no lease.
+     */
+    long leaseId;
+
+    /**
+     * The time when the lease was issued, or 0 if there is no lease.
+     */
+    long leaseTimeMs;
+
+    /**
+     * Previous element in the list.
+     */
+    NodeData prev;
+
+    /**
+     * Next element in the list.
+     */
+    NodeData next;
+
+    static NodeData ListHead(String name) {
+      NodeData node = new NodeData(name);
+      node.next = node;
+      node.prev = node;
+      return node;
+    }
+
+    NodeData(String datanodeUuid) {
+      this.datanodeUuid = datanodeUuid;
+    }
+
+    void removeSelf() {
+      if (this.prev != null) {
+        this.prev.next = this.next;
+      }
+      if (this.next != null) {
+        this.next.prev = this.prev;
+      }
+      this.next = null;
+      this.prev = null;
+    }
+
+    void addToEnd(NodeData node) {
+      Preconditions.checkState(node.next == null);
+      Preconditions.checkState(node.prev == null);
+      node.prev = this.prev;
+      node.next = this;
+      this.prev.next = node;
+      this.prev = node;
+    }
+
+    void addToBeginning(NodeData node) {
+      Preconditions.checkState(node.next == null);
+      Preconditions.checkState(node.prev == null);
+      node.next = this.next;
+      node.prev = this;
+      this.next.prev = node;
+      this.next = node;
+    }
+  }
+
+  /**
+   * List of datanodes which don't currently have block report leases.
+   */
+  private final NodeData deferredHead = NodeData.ListHead("deferredHead");
+
+  /**
+   * List of datanodes which currently have block report leases.
+   */
+  private final NodeData pendingHead = NodeData.ListHead("pendingHead");
+
+  /**
+   * Maps datanode UUIDs to NodeData.
+   */
+  private final HashMap<String, NodeData> nodes = new HashMap<>();
+
+  /**
+   * The current length of the pending list.
+   */
+  private int numPending = 0;
+
+  /**
+   * The maximum number of leases to hand out at any given time.
+   */
+  private final int maxPending;
+
+  /**
+   * The number of milliseconds after which a lease will expire.
+   */
+  private final long leaseExpiryMs;
+
+  /**
+   * The next ID we will use for a block report lease.
+   */
+  private long nextId = ThreadLocalRandom.current().nextLong();
+
+  BlockReportLeaseManager(Configuration conf) {
+    this(conf.getInt(
+          DFSConfigKeys.DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES,
+          DFSConfigKeys.DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES_DEFAULT),
+        conf.getLong(
+          DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS,
+          DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS_DEFAULT));
+  }
+
+  BlockReportLeaseManager(int maxPending, long leaseExpiryMs) {
+    Preconditions.checkArgument(maxPending >= 1,
+        "Cannot set the maximum number of block report leases to a " +
+            "value less than 1.");
+    this.maxPending = maxPending;
+    Preconditions.checkArgument(leaseExpiryMs >= 1,
+        "Cannot set full block report lease expiry period to a value " +
+         "less than 1.");
+    this.leaseExpiryMs = leaseExpiryMs;
+  }
+
+  /**
+   * Get the next block report lease ID.  Any number is valid except 0.
+   */
+  private synchronized long getNextId() {
+    long id;
+    do {
+      id = nextId++;
+    } while (id == 0);
+    return id;
+  }
+
+  public synchronized void register(DatanodeDescriptor dn) {
+    registerNode(dn);
+  }
+
+  private synchronized NodeData registerNode(DatanodeDescriptor dn) {
+    if (nodes.containsKey(dn.getDatanodeUuid())) {
+      LOG.info("Can't register DN {} because it is already registered.",
+          dn.getDatanodeUuid());
+      return null;
+    }
+    NodeData node = new NodeData(dn.getDatanodeUuid());
+    deferredHead.addToBeginning(node);
+    nodes.put(dn.getDatanodeUuid(), node);
+    LOG.info("Registered DN {} ({}).", dn.getDatanodeUuid(), dn.getXferAddr());
+    return node;
+  }
+
+  private synchronized void remove(NodeData node) {
+    if (node.leaseId != 0) {
+      numPending--;
+      node.leaseId = 0;
+      node.leaseTimeMs = 0;
+    }
+    node.removeSelf();
+  }
+
+  public synchronized void unregister(DatanodeDescriptor dn) {
+    NodeData node = nodes.remove(dn.getDatanodeUuid());
+    if (node == null) {
+      LOG.info("Can't unregister DN {} because it is not currently " +
+          "registered.", dn.getDatanodeUuid());
+      return;
+    }
+    remove(node);
+  }
+
+  public synchronized long requestLease(DatanodeDescriptor dn) {
+    NodeData node = nodes.get(dn.getDatanodeUuid());
+    if (node == null) {
+      LOG.warn("DN {} ({}) requested a lease even though it wasn't yet " +
+          "registered.  Registering now.", dn.getDatanodeUuid(),
+          dn.getXferAddr());
+      node = registerNode(dn);
+    }
+    if (node.leaseId != 0) {
+      // The DataNode wants a new lease, even though it already has one.
+      // This can happen if the DataNode is restarted in between requesting
+      // a lease and using it.
+      LOG.debug("Removing existing BR lease 0x{} for DN {} in order to " +
+               "issue a new one.", Long.toHexString(node.leaseId),
+               dn.getDatanodeUuid());
+    }
+    remove(node);
+    long monotonicNowMs = Time.monotonicNow();
+    pruneExpiredPending(monotonicNowMs);
+    if (numPending >= maxPending) {
+      if (LOG.isDebugEnabled()) {
+        StringBuilder allLeases = new StringBuilder();
+        String prefix = "";
+        for (NodeData cur = pendingHead.next; cur != pendingHead;
+             cur = cur.next) {
+          allLeases.append(prefix).append(cur.datanodeUuid);
+          prefix = ", ";
+        }
+        LOG.debug("Can't create a new BR lease for DN {}, because " +
+              "numPending equals maxPending at {}.  Current leases: {}",
+              dn.getDatanodeUuid(), numPending, allLeases.toString());
+      }
+      return 0;
+    }
+    numPending++;
+    node.leaseId = getNextId();
+    node.leaseTimeMs = monotonicNowMs;
+    pendingHead.addToEnd(node);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Created a new BR lease 0x{} for DN {}.  numPending = {}",
+          Long.toHexString(node.leaseId), dn.getDatanodeUuid(), numPending);
+    }
+    return node.leaseId;
+  }
+
+  private synchronized boolean pruneIfExpired(long monotonicNowMs,
+                                              NodeData node) {
+    if (monotonicNowMs < node.leaseTimeMs + leaseExpiryMs) {
+      return false;
+    }
+    LOG.info("Removing expired block report lease 0x{} for DN {}.",
+        Long.toHexString(node.leaseId), node.datanodeUuid);
+    Preconditions.checkState(node.leaseId != 0);
+    remove(node);
+    deferredHead.addToBeginning(node);
+    return true;
+  }
+
+  private synchronized void pruneExpiredPending(long monotonicNowMs) {
+    NodeData cur = pendingHead.next;
+    while (cur != pendingHead) {
+      NodeData next = cur.next;
+      if (!pruneIfExpired(monotonicNowMs, cur)) {
+        return;
+      }
+      cur = next;
+    }
+    LOG.trace("No entries remaining in the pending list.");
+  }
+
+  public synchronized boolean checkLease(DatanodeDescriptor dn,
+                                         long monotonicNowMs, long id) {
+    if (id == 0) {
+      LOG.debug("Datanode {} is using BR lease id 0x0 to bypass " +
+          "rate-limiting.", dn.getDatanodeUuid());
+      return true;
+    }
+    NodeData node = nodes.get(dn.getDatanodeUuid());
+    if (node == null) {
+      LOG.info("BR lease 0x{} is not valid for unknown datanode {}",
+          Long.toHexString(id), dn.getDatanodeUuid());
+      return false;
+    }
+    if (node.leaseId == 0) {
+      LOG.warn("BR lease 0x{} is not valid for DN {}, because the DN " +
+               "is not in the pending set.",
+               Long.toHexString(id), dn.getDatanodeUuid());
+      return false;
+    }
+    if (pruneIfExpired(monotonicNowMs, node)) {
+      LOG.warn("BR lease 0x{} is not valid for DN {}, because the lease " +
+               "has expired.", Long.toHexString(id), dn.getDatanodeUuid());
+      return false;
+    }
+    if (id != node.leaseId) {
+      LOG.warn("BR lease 0x{} is not valid for DN {}.  Expected BR lease 0x{}.",
+          Long.toHexString(id), dn.getDatanodeUuid(),
+          Long.toHexString(node.leaseId));
+      return false;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("BR lease 0x{} is valid for DN {}.",
+          Long.toHexString(id), dn.getDatanodeUuid());
+    }
+    return true;
+  }
+
+  public synchronized long removeLease(DatanodeDescriptor dn) {
+    NodeData node = nodes.get(dn.getDatanodeUuid());
+    if (node == null) {
+      LOG.info("Can't remove lease for unknown datanode {}",
+               dn.getDatanodeUuid());
+      return 0;
+    }
+    long id = node.leaseId;
+    if (id == 0) {
+      LOG.debug("DN {} has no lease to remove.", dn.getDatanodeUuid());
+      return 0;
+    }
+    remove(node);
+    deferredHead.addToEnd(node);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Removed BR lease 0x{} for DN {}.  numPending = {}",
+                Long.toHexString(id), dn.getDatanodeUuid(), numPending);
+    }
+    return id;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 486cb05..20a1f21 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -540,6 +540,7 @@ public class DatanodeManager {
     blockManager.removeBlocksAssociatedTo(nodeInfo);
     networktopology.remove(nodeInfo);
     decrementVersionCount(nodeInfo.getSoftwareVersion());
+    blockManager.getBlockReportLeaseManager().unregister(nodeInfo);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("remove datanode " + nodeInfo);
@@ -602,6 +603,7 @@ public class DatanodeManager {
     networktopology.add(node); // may throw InvalidTopologyException
     host2DatanodeMap.add(node);
     checkIfClusterIsNowMultiRack(node);
+    blockManager.getBlockReportLeaseManager().register(node);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug(getClass().getSimpleName() + ".addDatanode: "

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 63a0bb6..ea1abbd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -29,6 +29,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Joiner;
 import org.apache.commons.logging.Log;
@@ -355,9 +357,10 @@ class BPServiceActor implements Runnable {
   void triggerBlockReportForTests() {
     synchronized (pendingIncrementalBRperStorage) {
       scheduler.scheduleHeartbeat();
-      long nextBlockReportTime = scheduler.scheduleBlockReport(0);
+      long oldBlockReportTime = scheduler.nextBlockReportTime;
+      scheduler.forceFullBlockReportNow();
       pendingIncrementalBRperStorage.notifyAll();
-      while (nextBlockReportTime - scheduler.nextBlockReportTime >= 0) {
+      while (oldBlockReportTime == scheduler.nextBlockReportTime) {
         try {
           pendingIncrementalBRperStorage.wait(100);
         } catch (InterruptedException e) {
@@ -419,12 +422,7 @@ class BPServiceActor implements Runnable {
    * @return DatanodeCommands returned by the NN. May be null.
    * @throws IOException
    */
-  List<DatanodeCommand> blockReport() throws IOException {
-    // send block report if timer has expired.
-    if (!scheduler.isBlockReportDue()) {
-      return null;
-    }
-
+  List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
     final ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
 
     // Flush any block information that precedes the block report. Otherwise
@@ -460,7 +458,7 @@ class BPServiceActor implements Runnable {
         // Below split threshold, send all reports in a single message.
         DatanodeCommand cmd = bpNamenode.blockReport(
             bpRegistration, bpos.getBlockPoolId(), reports,
-              new BlockReportContext(1, 0, reportId));
+              new BlockReportContext(1, 0, reportId, fullBrLeaseId));
         numRPCs = 1;
         numReportsSent = reports.length;
         if (cmd != null) {
@@ -472,7 +470,8 @@ class BPServiceActor implements Runnable {
           StorageBlockReport singleReport[] = { reports[r] };
           DatanodeCommand cmd = bpNamenode.blockReport(
               bpRegistration, bpos.getBlockPoolId(), singleReport,
-              new BlockReportContext(reports.length, r, reportId));
+              new BlockReportContext(reports.length, r, reportId,
+                  fullBrLeaseId));
           numReportsSent++;
           numRPCs++;
           if (cmd != null) {
@@ -538,7 +537,8 @@ class BPServiceActor implements Runnable {
     return cmd;
   }
   
-  HeartbeatResponse sendHeartBeat() throws IOException {
+  HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
+      throws IOException {
     StorageReport[] reports =
         dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
     if (LOG.isDebugEnabled()) {
@@ -557,7 +557,8 @@ class BPServiceActor implements Runnable {
         dn.getXmitsInProgress(),
         dn.getXceiverCount(),
         numFailedVolumes,
-        volumeFailureSummary);
+        volumeFailureSummary,
+        requestBlockReportLease);
   }
   
   //This must be called only by BPOfferService
@@ -625,8 +626,9 @@ class BPServiceActor implements Runnable {
     LOG.info("For namenode " + nnAddr + " using"
         + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
         + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
-        + " Initial delay: " + dnConf.initialBlockReportDelay + "msec"
+        + " Initial delay: " + dnConf.initialBlockReportDelayMs + "msec"
         + "; heartBeatInterval=" + dnConf.heartBeatInterval);
+    long fullBlockReportLeaseId = 0;
 
     //
     // Now loop for a long time....
@@ -639,6 +641,7 @@ class BPServiceActor implements Runnable {
         // Every so often, send heartbeat or block-report
         //
         final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
+        HeartbeatResponse resp = null;
         if (sendHeartbeat) {
           //
           // All heartbeat messages include following info:
@@ -647,10 +650,23 @@ class BPServiceActor implements Runnable {
           // -- Total capacity
           // -- Bytes remaining
           //
+          boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
+                  scheduler.isBlockReportDue(startTime);
           scheduler.scheduleNextHeartbeat();
           if (!dn.areHeartbeatsDisabledForTests()) {
-            HeartbeatResponse resp = sendHeartBeat();
+            resp = sendHeartBeat(requestBlockReportLease);
             assert resp != null;
+            if (resp.getFullBlockReportLeaseId() != 0) {
+              if (fullBlockReportLeaseId != 0) {
+                LOG.warn(nnAddr + " sent back a full block report lease " +
+                        "ID of 0x" +
+                        Long.toHexString(resp.getFullBlockReportLeaseId()) +
+                        ", but we already have a lease ID of 0x" +
+                        Long.toHexString(fullBlockReportLeaseId) + ". " +
+                        "Overwriting old lease ID.");
+              }
+              fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
+            }
             dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);
 
             // If the state of this NN has changed (eg STANDBY->ACTIVE)
@@ -682,7 +698,16 @@ class BPServiceActor implements Runnable {
           reportReceivedDeletedBlocks();
         }
 
-        List<DatanodeCommand> cmds = blockReport();
+        List<DatanodeCommand> cmds = null;
+        boolean forceFullBr =
+            scheduler.forceFullBlockReport.getAndSet(false);
+        if (forceFullBr) {
+          LOG.info("Forcing a full block report to " + nnAddr);
+        }
+        if ((fullBlockReportLeaseId != 0) || forceFullBr) {
+          cmds = blockReport(fullBlockReportLeaseId);
+          fullBlockReportLeaseId = 0;
+        }
         processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
 
         DatanodeCommand cmd = cacheReport();
@@ -765,7 +790,7 @@ class BPServiceActor implements Runnable {
     bpos.registrationSucceeded(this, bpRegistration);
 
     // random short delay - helps scatter the BR from all DNs
-    scheduler.scheduleBlockReport(dnConf.initialBlockReportDelay);
+    scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs);
   }
 
 
@@ -958,7 +983,7 @@ class BPServiceActor implements Runnable {
     } else {
       LOG.info(bpos.toString() + ": scheduling a full block report.");
       synchronized(pendingIncrementalBRperStorage) {
-        scheduler.scheduleBlockReport(0);
+        scheduler.forceFullBlockReportNow();
         pendingIncrementalBRperStorage.notifyAll();
       }
     }
@@ -1011,6 +1036,9 @@ class BPServiceActor implements Runnable {
     @VisibleForTesting
     boolean resetBlockReportTime = true;
 
+    private final AtomicBoolean forceFullBlockReport =
+        new AtomicBoolean(false);
+
     private final long heartbeatIntervalMs;
     private final long blockReportIntervalMs;
 
@@ -1042,8 +1070,13 @@ class BPServiceActor implements Runnable {
       return (nextHeartbeatTime - startTime <= 0);
     }
 
-    boolean isBlockReportDue() {
-      return nextBlockReportTime - monotonicNow() <= 0;
+    boolean isBlockReportDue(long curTime) {
+      return nextBlockReportTime - curTime <= 0;
+    }
+
+    void forceFullBlockReportNow() {
+      forceFullBlockReport.set(true);
+      resetBlockReportTime = true;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
index 4b7fbc3..42b1b46 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
@@ -82,7 +82,7 @@ public class DNConf {
   final long heartBeatInterval;
   final long blockReportInterval;
   final long blockReportSplitThreshold;
-  final long initialBlockReportDelay;
+  final long initialBlockReportDelayMs;
   final long cacheReportInterval;
   final long dfsclientSlowIoWarningThresholdMs;
   final long datanodeSlowIoWarningThresholdMs;
@@ -159,7 +159,7 @@ public class DNConf {
           + "greater than or equal to" + "dfs.blockreport.intervalMsec."
           + " Setting initial delay to 0 msec:");
     }
-    initialBlockReportDelay = initBRDelay;
+    initialBlockReportDelayMs = initBRDelay;
     
     heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
         DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 28bc5f1..a4e71bc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -3972,7 +3972,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
   HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
       StorageReport[] reports, long cacheCapacity, long cacheUsed,
       int xceiverCount, int xmitsInProgress, int failedVolumes,
-      VolumeFailureSummary volumeFailureSummary) throws IOException {
+      VolumeFailureSummary volumeFailureSummary,
+      boolean requestFullBlockReportLease) throws IOException {
     readLock();
     try {
       //get datanode commands
@@ -3981,13 +3982,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
       DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
           nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
           xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
-      
+      long blockReportLeaseId = 0;
+      if (requestFullBlockReportLease) {
+        blockReportLeaseId =  blockManager.requestBlockReportLeaseId(nodeReg);
+      }
       //create ha status
       final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
           haContext.getState().getServiceState(),
           getFSImage().getLastAppliedOrWrittenTxId());
 
-      return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo);
+      return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
+          blockReportLeaseId);
     } finally {
       readUnlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index cc43d3c..6e7f3a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -116,6 +116,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
@@ -1270,13 +1271,13 @@ class NameNodeRpcServer implements NamenodeProtocols {
   public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
       StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
       int xmitsInProgress, int xceiverCount,
-      int failedVolumes, VolumeFailureSummary volumeFailureSummary)
-      throws IOException {
+      int failedVolumes, VolumeFailureSummary volumeFailureSummary,
+      boolean requestFullBlockReportLease) throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     return namesystem.handleHeartbeat(nodeReg, report,
         dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
-        failedVolumes, volumeFailureSummary);
+        failedVolumes, volumeFailureSummary, requestFullBlockReportLease);
   }
 
   @Override // DatanodeProtocol
@@ -1302,6 +1303,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
           blocks, context, (r == reports.length - 1));
       metrics.incrStorageBlockReportOps();
     }
+    BlockManagerFaultInjector.getInstance().
+        incomingBlockReportRpc(nodeReg, context);
 
     if (nn.getFSImage().isUpgradeFinalized() &&
         !namesystem.isRollingUpgrade() &&

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
index d0b0282..5bcd719 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
@@ -31,14 +31,33 @@ import org.apache.hadoop.classification.InterfaceAudience;
  */
 @InterfaceAudience.Private
 public class BlockReportContext {
+  /**
+   * The total number of RPCs contained in the block report.
+   */
   private final int totalRpcs;
+
+  /**
+   * The index of this particular RPC.
+   */
   private final int curRpc;
+
+  /**
+   * A 64-bit ID which identifies the block report as a whole.
+   */
   private final long reportId;
 
-  public BlockReportContext(int totalRpcs, int curRpc, long reportId) {
+  /**
+   * The lease ID which this block report is using, or 0 if this block report is
+   * bypassing rate-limiting.
+   */
+  private final long leaseId;
+
+  public BlockReportContext(int totalRpcs, int curRpc,
+                            long reportId, long leaseId) {
     this.totalRpcs = totalRpcs;
     this.curRpc = curRpc;
     this.reportId = reportId;
+    this.leaseId = leaseId;
   }
 
   public int getTotalRpcs() {
@@ -52,4 +71,8 @@ public class BlockReportContext {
   public long getReportId() {
     return reportId;
   }
+
+  public long getLeaseId() {
+    return leaseId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index a3b6004..dfe0813 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -102,6 +102,8 @@ public interface DatanodeProtocol {
    * @param xceiverCount number of active transceiver threads
    * @param failedVolumes number of failed volumes
    * @param volumeFailureSummary info about volume failures
+   * @param requestFullBlockReportLease whether to request a full block
+   *                                    report lease.
    * @throws IOException on error
    */
   @Idempotent
@@ -112,7 +114,8 @@ public interface DatanodeProtocol {
                                        int xmitsInProgress,
                                        int xceiverCount,
                                        int failedVolumes,
-                                       VolumeFailureSummary volumeFailureSummary)
+                                       VolumeFailureSummary volumeFailureSummary,
+                                       boolean requestFullBlockReportLease)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
index d00179e..8d6384e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java
@@ -34,12 +34,16 @@ public class HeartbeatResponse {
   private final NNHAStatusHeartbeat haStatus;
 
   private final RollingUpgradeStatus rollingUpdateStatus;
+
+  private final long fullBlockReportLeaseId;
   
   public HeartbeatResponse(DatanodeCommand[] cmds,
-      NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus) {
+      NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus,
+      long fullBlockReportLeaseId) {
     commands = cmds;
     this.haStatus = haStatus;
     this.rollingUpdateStatus = rollingUpdateStatus;
+    this.fullBlockReportLeaseId = fullBlockReportLeaseId;
   }
   
   public DatanodeCommand[] getCommands() {
@@ -53,4 +57,8 @@ public class HeartbeatResponse {
   public RollingUpgradeStatus getRollingUpdateStatus() {
     return rollingUpdateStatus;
   }
+
+  public long getFullBlockReportLeaseId() {
+    return fullBlockReportLeaseId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java
index a102c82..2f7d334 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
 /**
- * A BlockCommand is an instruction to a datanode to register with the namenode.
+ * A RegisterCommand is an instruction to a datanode to register with the namenode.
  * This command can't be combined with other commands in the same response.
  * This is because after the datanode processes RegisterCommand, it will skip
  * the rest of the DatanodeCommands in the same HeartbeatResponse.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index bce5f56..ea203e2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -194,6 +194,7 @@ message HeartbeatRequestProto {
   optional uint64 cacheCapacity = 6 [ default = 0 ];
   optional uint64 cacheUsed = 7 [default = 0 ];
   optional VolumeFailureSummaryProto volumeFailureSummary = 8;
+  optional bool requestFullBlockReportLease = 9 [ default = false ];
 }
 
 /**
@@ -217,6 +218,7 @@ message HeartbeatResponseProto {
   repeated DatanodeCommandProto cmds = 1; // Returned commands can be null
   required NNHAStatusHeartbeatProto haStatus = 2;
   optional RollingUpgradeStatusProto rollingUpgradeStatus = 3;
+  optional uint64 fullBlockReportLeaseId = 4 [ default = 0 ];
 }
 
 /**
@@ -246,6 +248,10 @@ message BlockReportContextProto  {
 
   // The unique 64-bit ID of this block report
   required int64 id = 3;
+
+  // The block report lease ID, or 0 if we are sending without a lease to
+  // bypass rate-limiting.
+  optional uint64 leaseId = 4 [ default = 0 ];
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index bdb01b2..e281248 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -578,6 +578,27 @@
 </property>
 
 <property>
+  <name>dfs.namenode.max.full.block.report.leases</name>
+  <value>6</value>
+  <description>The maximum number of leases for full block reports that the
+    NameNode will issue at any given time.  This prevents the NameNode from
+    being flooded with full block reports that use up all the RPC handler
+    threads.  This number should never be more than the number of RPC handler
+    threads or less than 1.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.full.block.report.lease.length.ms</name>
+  <value>300000</value>
+  <description>
+    The number of milliseconds that the NameNode will wait before invalidating
+    a full block report lease.  This prevents a crashed DataNode from
+    permanently using up a full block report lease.
+  </description>
+</property>
+
+<property>
   <name>dfs.datanode.directoryscan.interval</name>
   <value>21600</value>
   <description>Interval in seconds for Datanode to scan data directories and

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
index f0dab4c..9ead765 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
@@ -221,7 +221,7 @@ public class TestBlockListAsLongs {
     request.set(null);
     nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
     nn.blockReport(reg, "pool", sbr,
-        new BlockReportContext(1, 0, System.nanoTime()));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L));
     BlockReportRequestProto proto = request.get();
     assertNotNull(proto);
     assertTrue(proto.getReports(0).getBlocksList().isEmpty());
@@ -231,7 +231,7 @@ public class TestBlockListAsLongs {
     request.set(null);
     nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
     nn.blockReport(reg, "pool", sbr,
-        new BlockReportContext(1, 0, System.nanoTime()));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L));
     proto = request.get();
     assertNotNull(proto);
     assertFalse(proto.getReports(0).getBlocksList().isEmpty());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportRateLimiting.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportRateLimiting.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportRateLimiting.java
new file mode 100644
index 0000000..fc5f9e7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportRateLimiting.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Supplier;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.commons.lang.mutable.MutableObject;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TestBlockReportRateLimiting {
+  static final Log LOG = LogFactory.getLog(TestBlockReportRateLimiting.class);
+
+  private static void setFailure(AtomicReference<String> failure,
+                                 String what) {
+    failure.compareAndSet("", what);
+    LOG.error("Test error: " + what);
+  }
+
+  @After
+  public void restoreNormalBlockManagerFaultInjector() {
+    BlockManagerFaultInjector.instance = new BlockManagerFaultInjector();
+  }
+
+  @BeforeClass
+  public static void raiseBlockManagerLogLevels() {
+    GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
+    GenericTestUtils.setLogLevel(BlockReportLeaseManager.LOG, Level.ALL);
+  }
+
+  @Test(timeout=180000)
+  public void testRateLimitingDuringDataNodeStartup() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES, 1);
+    conf.setLong(DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS,
+        20L * 60L * 1000L);
+
+    final Semaphore fbrSem = new Semaphore(0);
+    final HashSet<DatanodeID> expectedFbrDns = new HashSet<>();
+    final HashSet<DatanodeID> fbrDns = new HashSet<>();
+    final AtomicReference<String> failure = new AtomicReference<String>("");
+
+    final BlockManagerFaultInjector injector = new BlockManagerFaultInjector() {
+      private int numLeases = 0;
+
+      @Override
+      public void incomingBlockReportRpc(DatanodeID nodeID,
+                    BlockReportContext context) throws IOException {
+        LOG.info("Incoming full block report from " + nodeID +
+            ".  Lease ID = 0x" + Long.toHexString(context.getLeaseId()));
+        if (context.getLeaseId() == 0) {
+          setFailure(failure, "Got unexpected rate-limiting-" +
+              "bypassing full block report RPC from " + nodeID);
+        }
+        fbrSem.acquireUninterruptibly();
+        synchronized (this) {
+          fbrDns.add(nodeID);
+          if (!expectedFbrDns.remove(nodeID)) {
+            setFailure(failure, "Got unexpected full block report " +
+                "RPC from " + nodeID + ".  expectedFbrDns = " +
+                Joiner.on(", ").join(expectedFbrDns));
+          }
+          LOG.info("Proceeding with full block report from " +
+              nodeID + ".  Lease ID = 0x" +
+              Long.toHexString(context.getLeaseId()));
+        }
+      }
+
+      @Override
+      public void requestBlockReportLease(DatanodeDescriptor node,
+                                          long leaseId) {
+        if (leaseId == 0) {
+          return;
+        }
+        synchronized (this) {
+          numLeases++;
+          expectedFbrDns.add(node);
+          LOG.info("requestBlockReportLease(node=" + node +
+              ", leaseId=0x" + Long.toHexString(leaseId) + ").  " +
+              "expectedFbrDns = " +  Joiner.on(", ").join(expectedFbrDns));
+          if (numLeases > 1) {
+            setFailure(failure, "More than 1 lease was issued at once.");
+          }
+        }
+      }
+
+      @Override
+      public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) {
+        LOG.info("removeBlockReportLease(node=" + node +
+                 ", leaseId=0x" + Long.toHexString(leaseId) + ")");
+        synchronized (this) {
+          numLeases--;
+        }
+      }
+    };
+    BlockManagerFaultInjector.instance = injector;
+
+    final int NUM_DATANODES = 5;
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+    cluster.waitActive();
+    for (int n = 1; n <= NUM_DATANODES; n++) {
+      LOG.info("Waiting for " + n + " datanode(s) to report in.");
+      fbrSem.release();
+      Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
+      final int currentN = n;
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          synchronized (injector) {
+            if (fbrDns.size() > currentN) {
+              setFailure(failure, "Expected at most " + currentN +
+                  " datanodes to have sent a block report, but actually " +
+                  fbrDns.size() + " have.");
+            }
+            return (fbrDns.size() >= currentN);
+          }
+        }
+      }, 25, 50000);
+    }
+    cluster.shutdown();
+    Assert.assertEquals("", failure.get());
+  }
+
+  /**
+   * Start a 2-node cluster with only one block report lease.  When the
+   * first datanode gets a lease, kill it.  Then wait for the lease to
+   * expire, and the second datanode to send a full block report.
+   */
+  @Test(timeout=180000)
+  public void testLeaseExpiration() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setInt(DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES, 1);
+    conf.setLong(DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS, 100L);
+
+    final Semaphore gotFbrSem = new Semaphore(0);
+    final AtomicReference<String> failure = new AtomicReference<String>("");
+    final AtomicReference<MiniDFSCluster> cluster =
+        new AtomicReference<>(null);
+    final BlockingQueue<Integer> datanodeToStop =
+        new ArrayBlockingQueue<Integer>(1);
+    final BlockManagerFaultInjector injector = new BlockManagerFaultInjector() {
+      private String uuidToStop = "";
+
+      @Override
+      public void incomingBlockReportRpc(DatanodeID nodeID,
+                BlockReportContext context) throws IOException {
+        if (context.getLeaseId() == 0) {
+          setFailure(failure, "Got unexpected rate-limiting-" +
+              "bypassing full block report RPC from " + nodeID);
+        }
+        synchronized (this) {
+          if (uuidToStop.equals(nodeID.getDatanodeUuid())) {
+            throw new IOException("Injecting failure into block " +
+                "report RPC for " + nodeID);
+          }
+        }
+        gotFbrSem.release();
+      }
+
+      @Override
+      public void requestBlockReportLease(DatanodeDescriptor node,
+                                          long leaseId) {
+        if (leaseId == 0) {
+          return;
+        }
+        synchronized (this) {
+          if (uuidToStop.isEmpty()) {
+            MiniDFSCluster cl;
+            do {
+              cl = cluster.get();
+            } while (cl == null);
+            int datanodeIndexToStop = getDatanodeIndex(cl, node);
+            uuidToStop = node.getDatanodeUuid();
+            datanodeToStop.add(Integer.valueOf(datanodeIndexToStop));
+          }
+        }
+      }
+
+      private int getDatanodeIndex(MiniDFSCluster cl,
+                                   DatanodeDescriptor node) {
+        List<DataNode> datanodes = cl.getDataNodes();
+        for (int i = 0; i < datanodes.size(); i++) {
+          DataNode datanode = datanodes.get(i);
+          if (datanode.getDatanodeUuid().equals(node.getDatanodeUuid())) {
+            return i;
+          }
+        }
+        throw new RuntimeException("Failed to find UUID " +
+            node.getDatanodeUuid() + " in the list of datanodes.");
+      }
+
+      @Override
+      public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) {
+      }
+    };
+    BlockManagerFaultInjector.instance = injector;
+    cluster.set(new MiniDFSCluster.Builder(conf).numDataNodes(2).build());
+    cluster.get().waitActive();
+    int datanodeIndexToStop = datanodeToStop.take();
+    cluster.get().stopDataNode(datanodeIndexToStop);
+    gotFbrSem.acquire();
+    cluster.get().shutdown();
+    Assert.assertEquals("", failure.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
index bf167a5..39bd5d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java
@@ -59,6 +59,15 @@ public class TestDatanodeManager {
   //The number of times the registration / removal of nodes should happen
   final int NUM_ITERATIONS = 500;
 
+  private static DatanodeManager mockDatanodeManager(
+      FSNamesystem fsn, Configuration conf) throws IOException {
+    BlockManager bm = Mockito.mock(BlockManager.class);
+    BlockReportLeaseManager blm = new BlockReportLeaseManager(conf);
+    Mockito.when(bm.getBlockReportLeaseManager()).thenReturn(blm);
+    DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
+    return dm;
+  }
+
   /**
    * This test sends a random sequence of node registrations and node removals
    * to the DatanodeManager (of nodes with different IDs and versions), and
@@ -70,8 +79,7 @@ public class TestDatanodeManager {
     //Create the DatanodeManager which will be tested
     FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
     Mockito.when(fsn.hasWriteLock()).thenReturn(true);
-    DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
-      fsn, new Configuration());
+    DatanodeManager dm = mockDatanodeManager(fsn, new Configuration());
 
     //Seed the RNG with a known value so test failures are easier to reproduce
     Random rng = new Random();
@@ -183,9 +191,8 @@ public class TestDatanodeManager {
         TestDatanodeManager.MyResolver.class, DNSToSwitchMapping.class);
     
     //create DatanodeManager
-    DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
-        fsn, conf);
-    
+    DatanodeManager dm = mockDatanodeManager(fsn, conf);
+
     //storageID to register.
     String storageID = "someStorageID-123";
     
@@ -258,7 +265,6 @@ public class TestDatanodeManager {
     HelperFunction("/"+ Shell.appendScriptExtension("topology-broken-script"));
   }
 
-
   /**
    * Helper function that tests the DatanodeManagers SortedBlock function
    * we invoke this function with and without topology scripts
@@ -281,8 +287,7 @@ public class TestDatanodeManager {
       conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
         resourcePath.toString());
     }
-    DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
-      fsn, conf);
+    DatanodeManager dm = mockDatanodeManager(fsn, conf);
 
     // register 5 datanodes, each with different storage ID and type
     DatanodeInfo[] locs = new DatanodeInfo[5];

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index d73f63e..cea6865 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -115,7 +115,7 @@ public class TestNameNodePrunesMissingStorages {
       // Stop the DataNode and send fake heartbeat with missing storage.
       cluster.stopDataNode(0);
       cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
-          0, null);
+          0, null, true);
 
       // Check that the missing storage was pruned.
       assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index 64cc78b..f970b3f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -143,7 +144,8 @@ public class TestBPOfferService {
           Mockito.anyInt(),
           Mockito.anyInt(),
           Mockito.anyInt(),
-          Mockito.any(VolumeFailureSummary.class));
+          Mockito.any(VolumeFailureSummary.class),
+          Mockito.anyBoolean());
     mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
     return mock;
   }
@@ -164,7 +166,8 @@ public class TestBPOfferService {
     public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
       heartbeatCounts[nnIdx]++;
       return new HeartbeatResponse(new DatanodeCommand[0],
-          mockHaStatuses[nnIdx], null);
+          mockHaStatuses[nnIdx], null,
+          ThreadLocalRandom.current().nextLong() | 1L);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
index c65ef85..27d1cea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
@@ -126,7 +126,7 @@ public class TestBlockHasMultipleReplicasOnSameDN {
 
     // Should not assert!
     cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports,
-        new BlockReportContext(1, 0, System.nanoTime()));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L));
 
     // Get the block locations once again.
     locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
index 973955f..38bdc64 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
@@ -39,6 +39,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
@@ -160,11 +161,12 @@ public class TestBlockRecovery {
             Mockito.anyInt(),
             Mockito.anyInt(),
             Mockito.anyInt(),
-            Mockito.any(VolumeFailureSummary.class)))
+            Mockito.any(VolumeFailureSummary.class),
+            Mockito.anyBoolean()))
         .thenReturn(new HeartbeatResponse(
             new DatanodeCommand[0],
             new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
-            null));
+            null, ThreadLocalRandom.current().nextLong() | 1L));
 
     dn = new DataNode(conf, locations, null) {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
index 0d7484c..b9b6512 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java
@@ -57,7 +57,7 @@ public class TestBpServiceActorScheduler {
     for (final long now : getTimestamps()) {
       Scheduler scheduler = makeMockScheduler(now);
       assertTrue(scheduler.isHeartbeatDue(now));
-      assertTrue(scheduler.isBlockReportDue());
+      assertTrue(scheduler.isBlockReportDue(scheduler.monotonicNow()));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index bf80887..e784c7a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.concurrent.ThreadLocalRandom;
 
 import com.google.common.base.Supplier;
 import org.apache.commons.logging.Log;
@@ -199,13 +200,13 @@ public class TestDatanodeProtocolRetryPolicy {
           heartbeatResponse = new HeartbeatResponse(
               new DatanodeCommand[]{RegisterCommand.REGISTER},
               new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
-              null);
+              null, ThreadLocalRandom.current().nextLong() | 1L);
         } else {
           LOG.info("mockito heartbeatResponse " + i);
           heartbeatResponse = new HeartbeatResponse(
               new DatanodeCommand[0],
               new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
-              null);
+              null, ThreadLocalRandom.current().nextLong() | 1L);
         }
         return heartbeatResponse;
       }
@@ -217,7 +218,8 @@ public class TestDatanodeProtocolRetryPolicy {
            Mockito.anyInt(),
            Mockito.anyInt(),
            Mockito.anyInt(),
-           Mockito.any(VolumeFailureSummary.class));
+           Mockito.any(VolumeFailureSummary.class),
+           Mockito.anyBoolean());
 
     dn = new DataNode(conf, locations, null) {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
index 58932fb..cb4022e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
@@ -21,6 +21,7 @@ import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doReturn;
@@ -31,6 +32,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -159,11 +161,14 @@ public class TestFsDatasetCache {
       throws IOException {
     NNHAStatusHeartbeat ha = new NNHAStatusHeartbeat(HAServiceState.ACTIVE,
         fsImage.getLastAppliedOrWrittenTxId());
-    HeartbeatResponse response = new HeartbeatResponse(cmds, ha, null);
+    HeartbeatResponse response =
+        new HeartbeatResponse(cmds, ha, null,
+            ThreadLocalRandom.current().nextLong() | 1L);
     doReturn(response).when(spyNN).sendHeartbeat(
         (DatanodeRegistration) any(),
         (StorageReport[]) any(), anyLong(), anyLong(),
-        anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any());
+        anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
+        anyBoolean());
   }
 
   private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
index b150b0d..67bbefe 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
@@ -40,7 +40,7 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
       LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
       StorageBlockReport[] singletonReport = { report };
       cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
-          new BlockReportContext(reports.length, i, System.nanoTime()));
+          new BlockReportContext(reports.length, i, System.nanoTime(), 0L));
       i++;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
index dca3c88..fd19ba6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
@@ -36,6 +36,6 @@ public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase {
                                   StorageBlockReport[] reports) throws IOException {
     LOG.info("Sending combined block reports for " + dnR);
     cluster.getNameNodeRpc().blockReport(dnR, poolId, reports,
-        new BlockReportContext(1, 0, System.nanoTime()));
+        new BlockReportContext(1, 0, System.nanoTime(), 0L));
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
index ecb28dc..a6032c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java
@@ -106,7 +106,7 @@ public class TestStorageReport {
         any(DatanodeRegistration.class),
         captor.capture(),
         anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
-        Mockito.any(VolumeFailureSummary.class));
+        Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean());
 
     StorageReport[] reports = captor.getValue();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index 2964f9a..39894b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -968,7 +968,7 @@ public class NNThroughputBenchmark implements Tool {
           new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
       };
       dataNodeProto.blockReport(dnRegistration, bpid, reports,
-              new BlockReportContext(1, 0, System.nanoTime()));
+              new BlockReportContext(1, 0, System.nanoTime(), 0L));
     }
 
     /**
@@ -981,7 +981,7 @@ public class NNThroughputBenchmark implements Tool {
       StorageReport[] rep = { new StorageReport(storage, false,
           DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
-          0L, 0L, 0, 0, 0, null).getCommands();
+          0L, 0L, 0, 0, 0, null, true).getCommands();
       if(cmds != null) {
         for (DatanodeCommand cmd : cmds ) {
           if(LOG.isDebugEnabled()) {
@@ -1030,7 +1030,7 @@ public class NNThroughputBenchmark implements Tool {
       StorageReport[] rep = { new StorageReport(storage,
           false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
       DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
-          rep, 0L, 0L, 0, 0, 0, null).getCommands();
+          rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
       if (cmds != null) {
         for (DatanodeCommand cmd : cmds) {
           if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
@@ -1213,7 +1213,7 @@ public class NNThroughputBenchmark implements Tool {
       StorageBlockReport[] report = { new StorageBlockReport(
           dn.storage, dn.getBlockReportList()) };
       dataNodeProto.blockReport(dn.dnRegistration, bpid, report,
-          new BlockReportContext(1, 0, System.nanoTime()));
+          new BlockReportContext(1, 0, System.nanoTime(), 0L));
       long end = Time.now();
       return end-start;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/378bb484/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
index f76c7fc..13ca9f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java
@@ -117,7 +117,7 @@ public class NameNodeAdapter {
       DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
     return namesystem.handleHeartbeat(nodeReg,
         BlockManagerTestUtil.getStorageReportsForDatanode(dd),
-        dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null);
+        dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true);
   }
 
   public static boolean setReplication(final FSNamesystem ns,


Mime
View raw message