hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject hadoop git commit: HDFS-7960. The full block report should prune zombie storages even if they're not empty. Contributed by Colin McCabe and Eddy Xu.
Date Tue, 24 Mar 2015 05:01:44 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk d7e3c3364 -> 50ee8f4e6


HDFS-7960. The full block report should prune zombie storages even if they're not empty. Contributed by Colin McCabe and Eddy Xu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/50ee8f4e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/50ee8f4e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/50ee8f4e

Branch: refs/heads/trunk
Commit: 50ee8f4e67a66aa77c5359182f61f3e951844db6
Parents: d7e3c33
Author: Andrew Wang <wang@apache.org>
Authored: Mon Mar 23 22:00:34 2015 -0700
Committer: Andrew Wang <wang@apache.org>
Committed: Mon Mar 23 22:00:34 2015 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../DatanodeProtocolClientSideTranslatorPB.java |   5 +-
 .../DatanodeProtocolServerSideTranslatorPB.java |   4 +-
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java |  15 +++
 .../server/blockmanagement/BlockManager.java    |  53 +++++++-
 .../blockmanagement/DatanodeDescriptor.java     |  51 ++++++-
 .../blockmanagement/DatanodeStorageInfo.java    |  13 +-
 .../hdfs/server/datanode/BPServiceActor.java    |  34 +++--
 .../hdfs/server/namenode/NameNodeRpcServer.java |  11 +-
 .../server/protocol/BlockReportContext.java     |  52 +++++++
 .../hdfs/server/protocol/DatanodeProtocol.java  |  10 +-
 .../src/main/proto/DatanodeProtocol.proto       |  14 ++
 .../hdfs/protocol/TestBlockListAsLongs.java     |   7 +-
 .../blockmanagement/TestBlockManager.java       |   8 +-
 .../TestNameNodePrunesMissingStorages.java      | 135 ++++++++++++++++++-
 .../server/datanode/BlockReportTestBase.java    |   4 +-
 .../server/datanode/TestBPOfferService.java     |  10 +-
 .../TestBlockHasMultipleReplicasOnSameDN.java   |   4 +-
 .../datanode/TestDataNodeVolumeFailure.java     |   3 +-
 .../TestDatanodeProtocolRetryPolicy.java        |   4 +-
 ...TestDnRespectsBlockReportSplitThreshold.java |   7 +-
 .../TestNNHandlesBlockReportPerStorage.java     |   7 +-
 .../TestNNHandlesCombinedBlockReport.java       |   4 +-
 .../server/datanode/TestTriggerBlockReport.java |   7 +-
 .../server/namenode/NNThroughputBenchmark.java  |   9 +-
 .../hdfs/server/namenode/TestDeadDatanode.java  |   4 +-
 .../hdfs/server/namenode/ha/TestDNFencing.java  |   4 +-
 27 files changed, 433 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index d2891e3..3dd5fb3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1241,6 +1241,9 @@ Release 2.7.0 - UNRELEASED
     provided by the client is larger than the one stored in the datanode.
     (Brahma Reddy Battula via szetszwo)
 
+    HDFS-7960. The full block report should prune zombie storages even if
+    they're not empty. (cmccabe and Eddy Xu via wang)
+
     BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
 
       HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
index c4003f1..825e835 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlo
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -169,7 +170,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
 
   @Override
   public DatanodeCommand blockReport(DatanodeRegistration registration,
-      String poolId, StorageBlockReport[] reports) throws IOException {
+      String poolId, StorageBlockReport[] reports, BlockReportContext context)
+        throws IOException {
     BlockReportRequestProto.Builder builder = BlockReportRequestProto
         .newBuilder().setRegistration(PBHelper.convert(registration))
         .setBlockPoolId(poolId);
@@ -191,6 +193,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
       }
       builder.addReports(reportBuilder.build());
     }
+    builder.setContext(PBHelper.convert(context));
     BlockReportResponseProto resp;
     try {
       resp = rpcProxy.blockReport(NULL_CONTROLLER, builder.build());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
index e18081f..873eb6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
@@ -161,7 +161,9 @@ public class DatanodeProtocolServerSideTranslatorPB implements
     }
     try {
       cmd = impl.blockReport(PBHelper.convert(request.getRegistration()),
-          request.getBlockPoolId(), report);
+          request.getBlockPoolId(), report,
+          request.hasContext() ?
+              PBHelper.convert(request.getContext()) : null);
     } catch (IOException e) {
       throw new ServiceException(e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index fad1d2c..b841850 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -111,6 +111,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rollin
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
@@ -123,6 +124,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.NNHAStatusHe
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockKeyProto;
 import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.BlockProto;
@@ -194,6 +196,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
@@ -3009,4 +3012,16 @@ public class PBHelper {
     return targetPinnings;
   }
 
+  public static BlockReportContext convert(BlockReportContextProto proto) {
+    return new BlockReportContext(proto.getTotalRpcs(),
+        proto.getCurRpc(), proto.getId());
+  }
+
+  public static BlockReportContextProto convert(BlockReportContext context) {
+    return BlockReportContextProto.newBuilder().
+        setTotalRpcs(context.getTotalRpcs()).
+        setCurRpc(context.getCurRpc()).
+        setId(context.getReportId()).
+        build();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 674c0ea..91cfead 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -1770,7 +1771,8 @@ public class BlockManager {
    */
   public boolean processReport(final DatanodeID nodeID,
       final DatanodeStorage storage,
-      final BlockListAsLongs newReport) throws IOException {
+      final BlockListAsLongs newReport, BlockReportContext context,
+      boolean lastStorageInRpc) throws IOException {
     namesystem.writeLock();
     final long startTime = Time.monotonicNow(); //after acquiring write lock
     final long endTime;
@@ -1809,6 +1811,29 @@ public class BlockManager {
       }
       
       storageInfo.receivedBlockReport();
+      if (context != null) {
+        storageInfo.setLastBlockReportId(context.getReportId());
+        if (lastStorageInRpc) {
+          int rpcsSeen = node.updateBlockReportContext(context);
+          if (rpcsSeen >= context.getTotalRpcs()) {
+            List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
+            if (zombies.isEmpty()) {
+              LOG.debug("processReport 0x{}: no zombie storages found.",
+                  Long.toHexString(context.getReportId()));
+            } else {
+              for (DatanodeStorageInfo zombie : zombies) {
+                removeZombieReplicas(context, zombie);
+              }
+            }
+            node.clearBlockReportContext();
+          } else {
+            LOG.debug("processReport 0x{}: {} more RPCs remaining in this " +
+                    "report.", Long.toHexString(context.getReportId()),
+                (context.getTotalRpcs() - rpcsSeen)
+            );
+          }
+        }
+      }
     } finally {
       endTime = Time.monotonicNow();
       namesystem.writeUnlock();
@@ -1833,6 +1858,32 @@ public class BlockManager {
     return !node.hasStaleStorages();
   }
 
+  private void removeZombieReplicas(BlockReportContext context,
+      DatanodeStorageInfo zombie) {
+    LOG.warn("processReport 0x{}: removing zombie storage {}, which no " +
+             "longer exists on the DataNode.",
+              Long.toHexString(context.getReportId()), zombie.getStorageID());
+    assert(namesystem.hasWriteLock());
+    Iterator<BlockInfoContiguous> iter = zombie.getBlockIterator();
+    int prevBlocks = zombie.numBlocks();
+    while (iter.hasNext()) {
+      BlockInfoContiguous block = iter.next();
+      // We assume that a block can be on only one storage in a DataNode.
+      // That's why we pass in the DatanodeDescriptor rather than the
+      // DatanodeStorageInfo.
+      // TODO: remove this assumption in case we want to put a block on
+      // more than one storage on a datanode (and because it's a difficult
+      // assumption to really enforce)
+      removeStoredBlock(block, zombie.getDatanodeDescriptor());
+      invalidateBlocks.remove(zombie.getDatanodeDescriptor(), block);
+    }
+    assert(zombie.numBlocks() == 0);
+    LOG.warn("processReport 0x{}: removed {} replicas from storage {}, " +
+            "which no longer exists on the DataNode.",
+            Long.toHexString(context.getReportId()), prevBlocks,
+            zombie.getStorageID());
+  }
+
   /**
    * Rescan the list of blocks which were previously postponed.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 3f143e7..d0d7a72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -31,6 +32,7 @@ import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@@ -62,7 +65,25 @@ public class DatanodeDescriptor extends DatanodeInfo {
   // Stores status of decommissioning.
   // If node is not decommissioning, do not use this object for anything.
   public final DecommissioningStatus decommissioningStatus = new DecommissioningStatus();
-  
+
+  private long curBlockReportId = 0;
+
+  private BitSet curBlockReportRpcsSeen = null;
+
+  public int updateBlockReportContext(BlockReportContext context) {
+    if (curBlockReportId != context.getReportId()) {
+      curBlockReportId = context.getReportId();
+      curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
+    }
+    curBlockReportRpcsSeen.set(context.getCurRpc());
+    return curBlockReportRpcsSeen.cardinality();
+  }
+
+  public void clearBlockReportContext() {
+    curBlockReportId = 0;
+    curBlockReportRpcsSeen = null;
+  }
+
   /** Block and targets pair */
   @InterfaceAudience.Private
   @InterfaceStability.Evolving
@@ -282,6 +303,34 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
+  static final private List<DatanodeStorageInfo> EMPTY_STORAGE_INFO_LIST =
+      ImmutableList.of();
+
+  List<DatanodeStorageInfo> removeZombieStorages() {
+    List<DatanodeStorageInfo> zombies = null;
+    synchronized (storageMap) {
+      Iterator<Map.Entry<String, DatanodeStorageInfo>> iter =
+          storageMap.entrySet().iterator();
+      while (iter.hasNext()) {
+        Map.Entry<String, DatanodeStorageInfo> entry = iter.next();
+        DatanodeStorageInfo storageInfo = entry.getValue();
+        if (storageInfo.getLastBlockReportId() != curBlockReportId) {
+          LOG.info(storageInfo.getStorageID() + " had lastBlockReportId 0x" +
+              Long.toHexString(storageInfo.getLastBlockReportId()) +
+              ", but curBlockReportId = 0x" +
+              Long.toHexString(curBlockReportId));
+          iter.remove();
+          if (zombies == null) {
+            zombies = new LinkedList<DatanodeStorageInfo>();
+          }
+          zombies.add(storageInfo);
+        }
+        storageInfo.setLastBlockReportId(0);
+      }
+    }
+    return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;
+  }
+
   /**
    * Remove block from the list of blocks belonging to the data-node. Remove
    * data-node from the block.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
index c4612a3..be16a87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStorageInfo.java
@@ -115,6 +115,9 @@ public class DatanodeStorageInfo {
   private volatile BlockInfoContiguous blockList = null;
   private int numBlocks = 0;
 
+  // The ID of the last full block report which updated this storage.
+  private long lastBlockReportId = 0;
+
   /** The number of block reports received */
   private int blockReportCount = 0;
 
@@ -178,7 +181,15 @@ public class DatanodeStorageInfo {
     this.remaining = remaining;
     this.blockPoolUsed = blockPoolUsed;
   }
-  
+
+  long getLastBlockReportId() {
+    return lastBlockReportId;
+  }
+
+  void setLastBlockReportId(long lastBlockReportId) {
+    this.lastBlockReportId = lastBlockReportId;
+  }
+
   State getState() {
     return this.state;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 90f2fe6..10cce45 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@@ -434,6 +435,17 @@ class BPServiceActor implements Runnable {
     return sendImmediateIBR;
   }
 
+  private long prevBlockReportId = 0;
+
+  private long generateUniqueBlockReportId() {
+    long id = System.nanoTime();
+    if (id <= prevBlockReportId) {
+      id = prevBlockReportId + 1;
+    }
+    prevBlockReportId = id;
+    return id;
+  }
+
   /**
    * Report the list blocks to the Namenode
    * @return DatanodeCommands returned by the NN. May be null.
@@ -476,11 +488,13 @@ class BPServiceActor implements Runnable {
     int numRPCs = 0;
     boolean success = false;
     long brSendStartTime = monotonicNow();
+    long reportId = generateUniqueBlockReportId();
     try {
       if (totalBlockCount < dnConf.blockReportSplitThreshold) {
         // Below split threshold, send all reports in a single message.
         DatanodeCommand cmd = bpNamenode.blockReport(
-            bpRegistration, bpos.getBlockPoolId(), reports);
+            bpRegistration, bpos.getBlockPoolId(), reports,
+              new BlockReportContext(1, 0, reportId));
         numRPCs = 1;
         numReportsSent = reports.length;
         if (cmd != null) {
@@ -488,10 +502,11 @@ class BPServiceActor implements Runnable {
         }
       } else {
         // Send one block report per message.
-        for (StorageBlockReport report : reports) {
-          StorageBlockReport singleReport[] = { report };
+        for (int r = 0; r < reports.length; r++) {
+          StorageBlockReport singleReport[] = { reports[r] };
           DatanodeCommand cmd = bpNamenode.blockReport(
-              bpRegistration, bpos.getBlockPoolId(), singleReport);
+              bpRegistration, bpos.getBlockPoolId(), singleReport,
+              new BlockReportContext(reports.length, r, reportId));
           numReportsSent++;
           numRPCs++;
           if (cmd != null) {
@@ -507,11 +522,12 @@ class BPServiceActor implements Runnable {
       dn.getMetrics().addBlockReport(brSendCost);
       final int nCmds = cmds.size();
       LOG.info((success ? "S" : "Uns") +
-          "uccessfully sent " + numReportsSent +
-          " of " + reports.length +
-          " blockreports for " + totalBlockCount +
-          " total blocks using " + numRPCs +
-          " RPCs. This took " + brCreateCost +
+          "uccessfully sent block report 0x" +
+          Long.toHexString(reportId) + ",  containing " + reports.length +
+          " storage report(s), of which we sent " + numReportsSent + "." +
+          " The reports had " + totalBlockCount +
+          " total blocks and used " + numRPCs +
+          " RPC(s). This took " + brCreateCost +
           " msec to generate and " + brSendCost +
           " msecs for RPC and NN processing." +
           " Got back " +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 059bd28..1788335 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -121,6 +121,7 @@ import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -1292,7 +1293,8 @@ class NameNodeRpcServer implements NamenodeProtocols {
 
   @Override // DatanodeProtocol
   public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
-      String poolId, StorageBlockReport[] reports) throws IOException {
+        String poolId, StorageBlockReport[] reports,
+        BlockReportContext context) throws IOException {
     checkNNStartup();
     verifyRequest(nodeReg);
     if(blockStateChangeLog.isDebugEnabled()) {
@@ -1301,14 +1303,15 @@ class NameNodeRpcServer implements NamenodeProtocols {
     }
     final BlockManager bm = namesystem.getBlockManager(); 
     boolean noStaleStorages = false;
-    for(StorageBlockReport r : reports) {
-      final BlockListAsLongs blocks = r.getBlocks();
+    for (int r = 0; r < reports.length; r++) {
+      final BlockListAsLongs blocks = reports[r].getBlocks();
       //
       // BlockManager.processReport accumulates information of prior calls
       // for the same node and storage, so the value returned by the last
       // call of this loop is the final updated value for noStaleStorage.
       //
-      noStaleStorages = bm.processReport(nodeReg, r.getStorage(), blocks);
+      noStaleStorages = bm.processReport(nodeReg, reports[r].getStorage(),
+          blocks, context, (r == reports.length - 1));
       metrics.incrStorageBlockReportOps();
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
new file mode 100644
index 0000000..a084a81
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.protocol;
+
+/**
+ * The context of the block report.
+ *
+ * This is a set of fields that the Datanode sends to provide context about a
+ * block report RPC.  The context includes a unique 64-bit ID which
+ * identifies the block report as a whole.  It also includes the total number
+ * of RPCs which this block report is split into, and the index into that
+ * total for the current RPC.
+ */
+public class BlockReportContext {
+  private final int totalRpcs;
+  private final int curRpc;
+  private final long reportId;
+
+  public BlockReportContext(int totalRpcs, int curRpc, long reportId) {
+    this.totalRpcs = totalRpcs;
+    this.curRpc = curRpc;
+    this.reportId = reportId;
+  }
+
+  public int getTotalRpcs() {
+    return totalRpcs;
+  }
+
+  public int getCurRpc() {
+    return curRpc;
+  }
+
+  public long getReportId() {
+    return reportId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index 047de56..a3b6004 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -23,7 +23,6 @@ import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
@@ -128,20 +127,23 @@ public interface DatanodeProtocol {
    *     Each finalized block is represented as 3 longs. Each under-
    *     construction replica is represented as 4 longs.
    *     This is done instead of Block[] to reduce memory used by block reports.
-   *     
+   * @param reports report of blocks per storage
+   * @param context Context information for this block report.
+   *
    * @return - the next command for DN to process.
    * @throws IOException
    */
   @Idempotent
   public DatanodeCommand blockReport(DatanodeRegistration registration,
-      String poolId, StorageBlockReport[] reports) throws IOException;
+            String poolId, StorageBlockReport[] reports,
+            BlockReportContext context) throws IOException;
     
 
   /**
    * Communicates the complete list of locally cached blocks to the NameNode.
    * 
    * This method is similar to
-   * {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[])},
+   * {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[], BlockReportContext)},
    * which is used to communicated blocks stored on disk.
    *
    * @param            The datanode registration.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index 7b3a4a9..3083dc9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -224,11 +224,25 @@ message HeartbeatResponseProto {
  *                second long represents length
  *                third long represents gen stamp
  *                fourth long (if under construction) represents replica state
+ * context      - An optional field containing information about the context
+ *                of this block report.
  */
 message BlockReportRequestProto {
   required DatanodeRegistrationProto registration = 1;
   required string blockPoolId = 2;
   repeated StorageBlockReportProto reports = 3;
+  optional BlockReportContextProto context = 4;
+}
+
+message BlockReportContextProto  {
+  // The total number of RPCs this block report is broken into.
+  required int32 totalRpcs = 1;
+
+  // The index of the current RPC (zero-based)
+  required int32 curRpc = 2;
+
+  // The unique 64-bit ID of this block report
+  required int64 id = 3;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
index bebde18..f0dab4c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -219,7 +220,8 @@ public class TestBlockListAsLongs {
     // check DN sends new-style BR
     request.set(null);
     nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
-    nn.blockReport(reg, "pool", sbr);
+    nn.blockReport(reg, "pool", sbr,
+        new BlockReportContext(1, 0, System.nanoTime()));
     BlockReportRequestProto proto = request.get();
     assertNotNull(proto);
     assertTrue(proto.getReports(0).getBlocksList().isEmpty());
@@ -228,7 +230,8 @@ public class TestBlockListAsLongs {
     // back up to prior version and check DN sends old-style BR
     request.set(null);
     nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
-    nn.blockReport(reg, "pool", sbr);
+    nn.blockReport(reg, "pool", sbr,
+        new BlockReportContext(1, 0, System.nanoTime()));
     proto = request.get();
     assertNotNull(proto);
     assertFalse(proto.getReports(0).getBlocksList().isEmpty());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
index d9ac9e5..707c780 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
@@ -555,12 +555,12 @@ public class TestBlockManager {
     reset(node);
     
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        BlockListAsLongs.EMPTY);
+        BlockListAsLongs.EMPTY, null, false);
     assertEquals(1, ds.getBlockReportCount());
     // send block report again, should NOT be processed
     reset(node);
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        BlockListAsLongs.EMPTY);
+        BlockListAsLongs.EMPTY, null, false);
     assertEquals(1, ds.getBlockReportCount());
 
     // re-register as if node restarted, should update existing node
@@ -571,7 +571,7 @@ public class TestBlockManager {
     // send block report, should be processed after restart
     reset(node);
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-                     BlockListAsLongs.EMPTY);
+                     BlockListAsLongs.EMPTY, null, false);
     // Reinitialize as registration with empty storage list pruned
     // node.storageMap.
     ds = node.getStorageInfos()[0];
@@ -600,7 +600,7 @@ public class TestBlockManager {
     reset(node);
     doReturn(1).when(node).numBlocks();
     bm.processReport(node, new DatanodeStorage(ds.getStorageID()),
-        BlockListAsLongs.EMPTY);
+        BlockListAsLongs.EMPTY, null, false);
     assertEquals(1, ds.getBlockReportCount());
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
index b67ae7a..4b97d01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java
@@ -18,26 +18,40 @@
 
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import com.google.common.base.Supplier;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.math3.stat.inference.TestUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
 
 import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
 
 
 public class TestNameNodePrunesMissingStorages {
@@ -110,7 +124,9 @@ public class TestNameNodePrunesMissingStorages {
   }
 
   /**
-   * Verify that the NameNode does not prune storages with blocks.
+   * Verify that the NameNode does not prune storages with blocks
+   * simply as a result of a heartbeat being sent missing that storage.
+   *
    * @throws IOException
    */
   @Test (timeout=300000)
@@ -118,4 +134,119 @@ public class TestNameNodePrunesMissingStorages {
     // Run the test with 1 storage, after the text still expect 1 storage.
     runTest(GenericTestUtils.getMethodName(), true, 1, 1);
   }
+
+  /**
+   * Regression test for HDFS-7960.<p/>
+   *
+   * Shutting down a datanode, removing a storage directory, and restarting
+   * the DataNode should not produce zombie storages.
+   */
+  @Test(timeout=300000)
+  public void testRemovingStorageDoesNotProduceZombies() throws Exception {
+    Configuration conf = new HdfsConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1);
+    final int NUM_STORAGES_PER_DN = 2;
+    final MiniDFSCluster cluster = new MiniDFSCluster
+        .Builder(conf).numDataNodes(3)
+        .storagesPerDatanode(NUM_STORAGES_PER_DN)
+        .build();
+    try {
+      cluster.waitActive();
+      for (DataNode dn : cluster.getDataNodes()) {
+        assertEquals(NUM_STORAGES_PER_DN,
+          cluster.getNamesystem().getBlockManager().
+              getDatanodeManager().getDatanode(dn.getDatanodeId()).
+              getStorageInfos().length);
+      }
+      // Create a file which will end up on all 3 datanodes.
+      final Path TEST_PATH = new Path("/foo1");
+      DistributedFileSystem fs = cluster.getFileSystem();
+      DFSTestUtil.createFile(fs, TEST_PATH, 1024, (short) 3, 0xcafecafe);
+      for (DataNode dn : cluster.getDataNodes()) {
+        DataNodeTestUtils.triggerBlockReport(dn);
+      }
+      ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/foo1"));
+      cluster.getNamesystem().writeLock();
+      final String storageIdToRemove;
+      String datanodeUuid;
+      // Find the first storage which this block is in.
+      try {
+        Iterator<DatanodeStorageInfo> storageInfoIter =
+            cluster.getNamesystem().getBlockManager().
+                getStorages(block.getLocalBlock()).iterator();
+        assertTrue(storageInfoIter.hasNext());
+        DatanodeStorageInfo info = storageInfoIter.next();
+        storageIdToRemove = info.getStorageID();
+        datanodeUuid = info.getDatanodeDescriptor().getDatanodeUuid();
+      } finally {
+        cluster.getNamesystem().writeUnlock();
+      }
+      // Find the DataNode which holds that first storage.
+      final DataNode datanodeToRemoveStorageFrom;
+      int datanodeToRemoveStorageFromIdx = 0;
+      while (true) {
+        if (datanodeToRemoveStorageFromIdx >= cluster.getDataNodes().size()) {
+          Assert.fail("failed to find datanode with uuid " + datanodeUuid);
+          datanodeToRemoveStorageFrom = null;
+          break;
+        }
+        DataNode dn = cluster.getDataNodes().
+            get(datanodeToRemoveStorageFromIdx);
+        if (dn.getDatanodeUuid().equals(datanodeUuid)) {
+          datanodeToRemoveStorageFrom = dn;
+          break;
+        }
+        datanodeToRemoveStorageFromIdx++;
+      }
+      // Find the volume within the datanode which holds that first storage.
+      List<? extends FsVolumeSpi> volumes =
+          datanodeToRemoveStorageFrom.getFSDataset().getVolumes();
+      assertEquals(NUM_STORAGES_PER_DN, volumes.size());
+      String volumeDirectoryToRemove = null;
+      for (FsVolumeSpi volume : volumes) {
+        if (volume.getStorageID().equals(storageIdToRemove)) {
+          volumeDirectoryToRemove = volume.getBasePath();
+        }
+      }
+      // Shut down the datanode and remove the volume.
+      // Replace the volume directory with a regular file, which will
+      // cause a volume failure.  (If we merely removed the directory,
+      // it would be re-initialized with a new storage ID.)
+      assertNotNull(volumeDirectoryToRemove);
+      datanodeToRemoveStorageFrom.shutdown();
+      FileUtil.fullyDelete(new File(volumeDirectoryToRemove));
+      FileOutputStream fos = new FileOutputStream(volumeDirectoryToRemove);
+      try {
+        fos.write(1);
+      } finally {
+        fos.close();
+      }
+      cluster.restartDataNode(datanodeToRemoveStorageFromIdx);
+      // Wait for the NameNode to remove the storage.
+      LOG.info("waiting for the datanode to remove " + storageIdToRemove);
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          final DatanodeDescriptor dnDescriptor =
+              cluster.getNamesystem().getBlockManager().getDatanodeManager().
+                  getDatanode(datanodeToRemoveStorageFrom.getDatanodeUuid());
+          assertNotNull(dnDescriptor);
+          DatanodeStorageInfo[] infos = dnDescriptor.getStorageInfos();
+          for (DatanodeStorageInfo info : infos) {
+            if (info.getStorageID().equals(storageIdToRemove)) {
+              LOG.info("Still found storage " + storageIdToRemove + " on " +
+                  info + ".");
+              return false;
+            }
+          }
+          assertEquals(NUM_STORAGES_PER_DN - 1, infos.length);
+          return true;
+        }
+      }, 10, 30000);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index de66db5..c4a2d06 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@@ -613,7 +614,8 @@ public abstract class BlockReportTestBase {
         .when(spy).blockReport(
           Mockito.<DatanodeRegistration>anyObject(),
           Mockito.anyString(),
-          Mockito.<StorageBlockReport[]>anyObject());
+          Mockito.<StorageBlockReport[]>anyObject(),
+          Mockito.<BlockReportContext>anyObject());
 
       // Force a block report to be generated. The block report will have
       // an RBW replica in it. Wait for the RPC to be sent, but block

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
index bc49793..3aa9a7b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -216,7 +217,8 @@ public class TestBPOfferService {
         .when(mockNN2).blockReport(
             Mockito.<DatanodeRegistration>anyObject(),  
             Mockito.eq(FAKE_BPID),
-            Mockito.<StorageBlockReport[]>anyObject());
+            Mockito.<StorageBlockReport[]>anyObject(),
+            Mockito.<BlockReportContext>anyObject());
 
     bpos.start();
     try {
@@ -406,7 +408,8 @@ public class TestBPOfferService {
           Mockito.verify(mockNN).blockReport(
               Mockito.<DatanodeRegistration>anyObject(),  
               Mockito.eq(FAKE_BPID),
-              Mockito.<StorageBlockReport[]>anyObject());
+              Mockito.<StorageBlockReport[]>anyObject(),
+              Mockito.<BlockReportContext>anyObject());
           return true;
         } catch (Throwable t) {
           LOG.info("waiting on block report: " + t.getMessage());
@@ -431,7 +434,8 @@ public class TestBPOfferService {
           Mockito.verify(mockNN).blockReport(
                   Mockito.<DatanodeRegistration>anyObject(),
                   Mockito.eq(FAKE_BPID),
-                  Mockito.<StorageBlockReport[]>anyObject());
+                  Mockito.<StorageBlockReport[]>anyObject(),
+                  Mockito.<BlockReportContext>anyObject());
           return true;
         } catch (Throwable t) {
           LOG.info("waiting on block report: " + t.getMessage());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
index 3238d6a..c47209e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.*;
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
@@ -122,7 +123,8 @@ public class TestBlockHasMultipleReplicasOnSameDN {
     }
 
     // Should not assert!
-    cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports);
+    cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports,
+        new BlockReportContext(1, 0, System.nanoTime()));
 
     // Get the block locations once again.
     locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
index 0428b81..41e8d7b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
@@ -185,7 +186,7 @@ public class TestDataNodeVolumeFailure {
             new StorageBlockReport(dnStorage, blockList);
     }
     
-    cluster.getNameNodeRpc().blockReport(dnR, bpid, reports);
+    cluster.getNameNodeRpc().blockReport(dnR, bpid, reports, null);
 
     // verify number of blocks and files...
     verify(filename, filesize);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
index ac7ebc0..cab50b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
@@ -136,7 +137,8 @@ public class TestDatanodeProtocolRetryPolicy {
           Mockito.verify(mockNN).blockReport(
               Mockito.eq(datanodeRegistration),
               Mockito.eq(POOL_ID),
-              Mockito.<StorageBlockReport[]>anyObject());
+              Mockito.<StorageBlockReport[]>anyObject(),
+              Mockito.<BlockReportContext>anyObject());
           return true;
         } catch (Throwable t) {
           LOG.info("waiting on block report: " + t.getMessage());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
index a5e4d4e..aadd9b2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDnRespectsBlockReportSplitThreshold.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.*;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY;
@@ -133,7 +134,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
     Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
         any(DatanodeRegistration.class),
         anyString(),
-        captor.capture());
+        captor.capture(),  Mockito.<BlockReportContext>anyObject());
 
     verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
   }
@@ -165,7 +166,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
     Mockito.verify(nnSpy, times(1)).blockReport(
         any(DatanodeRegistration.class),
         anyString(),
-        captor.capture());
+        captor.capture(),  Mockito.<BlockReportContext>anyObject());
 
     verifyCapturedArguments(captor, cluster.getStoragesPerDatanode(), BLOCKS_IN_FILE);
   }
@@ -197,7 +198,7 @@ public class TestDnRespectsBlockReportSplitThreshold {
     Mockito.verify(nnSpy, times(cluster.getStoragesPerDatanode())).blockReport(
         any(DatanodeRegistration.class),
         anyString(),
-        captor.capture());
+        captor.capture(), Mockito.<BlockReportContext>anyObject());
 
     verifyCapturedArguments(captor, 1, BLOCKS_IN_FILE);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
index 1b03786..b150b0d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
+import org.apache.hadoop.util.Time;
 
 
 /**
@@ -33,10 +35,13 @@ public class TestNNHandlesBlockReportPerStorage extends BlockReportTestBase {
   @Override
   protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
       StorageBlockReport[] reports) throws IOException {
+    int i = 0;
     for (StorageBlockReport report : reports) {
       LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
       StorageBlockReport[] singletonReport = { report };
-      cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport);
+      cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
+          new BlockReportContext(reports.length, i, System.nanoTime()));
+      i++;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
index 036b550..dca3c88 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 
@@ -34,6 +35,7 @@ public class TestNNHandlesCombinedBlockReport extends BlockReportTestBase {
   protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
                                   StorageBlockReport[] reports) throws IOException {
     LOG.info("Sending combined block reports for " + dnR);
-    cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
+    cluster.getNameNodeRpc().blockReport(dnR, poolId, reports,
+        new BlockReportContext(1, 0, System.nanoTime()));
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
index efb9d98..3195d7d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.client.BlockReportOptions;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
@@ -76,7 +77,8 @@ public final class TestTriggerBlockReport {
       Mockito.verify(spy, times(0)).blockReport(
           any(DatanodeRegistration.class),
           anyString(),
-          any(StorageBlockReport[].class));
+          any(StorageBlockReport[].class),
+          Mockito.<BlockReportContext>anyObject());
       Mockito.verify(spy, times(1)).blockReceivedAndDeleted(
           any(DatanodeRegistration.class),
           anyString(),
@@ -113,7 +115,8 @@ public final class TestTriggerBlockReport {
       Mockito.verify(spy, timeout(60000)).blockReport(
           any(DatanodeRegistration.class),
           anyString(),
-          any(StorageBlockReport[].class));
+          any(StorageBlockReport[].class),
+          Mockito.<BlockReportContext>anyObject());
     }
 
     cluster.shutdown();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
index bc3c6b5..9e24f72 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -939,7 +940,8 @@ public class NNThroughputBenchmark implements Tool {
           new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
       };
       nameNodeProto.blockReport(dnRegistration, 
-          nameNode.getNamesystem().getBlockPoolId(), reports);
+          nameNode.getNamesystem().getBlockPoolId(), reports,
+              new BlockReportContext(1, 0, System.nanoTime()));
     }
 
     /**
@@ -1184,8 +1186,9 @@ public class NNThroughputBenchmark implements Tool {
       long start = Time.now();
       StorageBlockReport[] report = { new StorageBlockReport(
           dn.storage, dn.getBlockReportList()) };
-      nameNodeProto.blockReport(dn.dnRegistration, nameNode.getNamesystem()
-          .getBlockPoolId(), report);
+      nameNodeProto.blockReport(dn.dnRegistration,
+          nameNode.getNamesystem().getBlockPoolId(), report,
+          new BlockReportContext(1, 0, System.nanoTime()));
       long end = Time.now();
       return end-start;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
index ee80b33..92c329e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -107,7 +108,8 @@ public class TestDeadDatanode {
         new DatanodeStorage(reg.getDatanodeUuid()),
         BlockListAsLongs.EMPTY) };
     try {
-      dnp.blockReport(reg, poolId, report);
+      dnp.blockReport(reg, poolId, report,
+          new BlockReportContext(1, 0, System.nanoTime()));
       fail("Expected IOException is not thrown");
     } catch (IOException ex) {
       // Expected

http://git-wip-us.apache.org/repos/asf/hadoop/blob/50ee8f4e/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
index fa7a307..74358bb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
 import org.apache.hadoop.io.IOUtils;
@@ -547,7 +548,8 @@ public class TestDNFencing {
         .when(spy).blockReport(
           Mockito.<DatanodeRegistration>anyObject(),
           Mockito.anyString(),
-          Mockito.<StorageBlockReport[]>anyObject());
+          Mockito.<StorageBlockReport[]>anyObject(),
+          Mockito.<BlockReportContext>anyObject());
       dn.scheduleAllBlockReport(0);
       delayer.waitForCall();
       


Mime
View raw message