hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cmcc...@apache.org
Subject svn commit: r1532116 - in /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ src/main/java/org/apache/hadoop/hdfs/server/da...
Date Mon, 14 Oct 2013 22:19:11 GMT
Author: cmccabe
Date: Mon Oct 14 22:19:10 2013
New Revision: 1532116

URL: http://svn.apache.org/r1532116
Log:
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only (cmccabe)

Added:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java
Modified:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt?rev=1532116&r1=1532115&r2=1532116&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
(original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
Mon Oct 14 22:19:10 2013
@@ -61,6 +61,7 @@ HDFS-4949 (Unreleased)
     String. (cnauroth)
 
   OPTIMIZATIONS
+    HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
 
   BUG FIXES
     HDFS-5169. hdfs.c: translateZCRException: null pointer deref when

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1532116&r1=1532115&r2=1532116&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
(original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
Mon Oct 14 22:19:10 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
@@ -119,6 +120,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@@ -696,6 +698,8 @@ public class PBHelper {
       return PBHelper.convert(proto.getKeyUpdateCmd());
     case RegisterCommand:
       return REG_CMD;
+    case BlockIdCommand:
+      return PBHelper.convert(proto.getBlkIdCmd());
     }
     return null;
   }
@@ -738,12 +742,6 @@ public class PBHelper {
     case DatanodeProtocol.DNA_SHUTDOWN:
       builder.setAction(BlockCommandProto.Action.SHUTDOWN);
       break;
-    case DatanodeProtocol.DNA_CACHE:
-      builder.setAction(BlockCommandProto.Action.CACHE);
-      break;
-    case DatanodeProtocol.DNA_UNCACHE:
-      builder.setAction(BlockCommandProto.Action.UNCACHE);
-      break;
     default:
       throw new AssertionError("Invalid action");
     }
@@ -754,6 +752,26 @@ public class PBHelper {
     builder.addAllTargets(PBHelper.convert(cmd.getTargets()));
     return builder.build();
   }
+  
+  public static BlockIdCommandProto convert(BlockIdCommand cmd) {
+    BlockIdCommandProto.Builder builder = BlockIdCommandProto.newBuilder()
+        .setBlockPoolId(cmd.getBlockPoolId());
+    switch (cmd.getAction()) {
+    case DatanodeProtocol.DNA_CACHE:
+      builder.setAction(BlockIdCommandProto.Action.CACHE);
+      break;
+    case DatanodeProtocol.DNA_UNCACHE:
+      builder.setAction(BlockIdCommandProto.Action.UNCACHE);
+      break;
+    default:
+      throw new AssertionError("Invalid action");
+    }
+    long[] blockIds = cmd.getBlockIds();
+    for (int i = 0; i < blockIds.length; i++) {
+      builder.addBlockIds(blockIds[i]);
+    }
+    return builder.build();
+  }
 
   private static List<DatanodeInfosProto> convert(DatanodeInfo[][] targets) {
     DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length];
@@ -796,11 +814,14 @@ public class PBHelper {
       break;
     case DatanodeProtocol.DNA_TRANSFER:
     case DatanodeProtocol.DNA_INVALIDATE:
+    case DatanodeProtocol.DNA_SHUTDOWN:
+      builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).
+        setBlkCmd(PBHelper.convert((BlockCommand) datanodeCommand));
+      break;
     case DatanodeProtocol.DNA_CACHE:
     case DatanodeProtocol.DNA_UNCACHE:
-    case DatanodeProtocol.DNA_SHUTDOWN:
-      builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd(
-          PBHelper.convert((BlockCommand) datanodeCommand));
+      builder.setCmdType(DatanodeCommandProto.Type.BlockIdCommand).
+        setBlkIdCmd(PBHelper.convert((BlockIdCommand) datanodeCommand));
       break;
     case DatanodeProtocol.DNA_UNKNOWN: //Not expected
     default:
@@ -851,6 +872,20 @@ public class PBHelper {
     case SHUTDOWN:
       action = DatanodeProtocol.DNA_SHUTDOWN;
       break;
+    default:
+      throw new AssertionError("Unknown action type: " + blkCmd.getAction());
+    }
+    return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
+  }
+
+  public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) {
+    int numBlockIds = blkIdCmd.getBlockIdsCount();
+    long blockIds[] = new long[numBlockIds];
+    for (int i = 0; i < numBlockIds; i++) {
+      blockIds[i] = blkIdCmd.getBlockIds(i);
+    }
+    int action = DatanodeProtocol.DNA_UNKNOWN;
+    switch (blkIdCmd.getAction()) {
     case CACHE:
       action = DatanodeProtocol.DNA_CACHE;
       break;
@@ -858,9 +893,9 @@ public class PBHelper {
       action = DatanodeProtocol.DNA_UNCACHE;
       break;
     default:
-      throw new AssertionError("Unknown action type: " + blkCmd.getAction());
+      throw new AssertionError("Unknown action type: " + blkIdCmd.getAction());
     }
-    return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
+    return new BlockIdCommand(action, blkIdCmd.getBlockPoolId(), blockIds);
   }
 
   public static DatanodeInfo[] convert(DatanodeInfosProto datanodeInfosProto) {

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1532116&r1=1532115&r2=1532116&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
(original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
Mon Oct 14 22:19:10 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
@@ -1308,14 +1309,22 @@ public class DatanodeManager {
         // Check pending caching
         List<Block> pendingCacheList = nodeinfo.getCacheBlocks();
         if (pendingCacheList != null) {
-          cmds.add(new BlockCommand(DatanodeProtocol.DNA_CACHE, blockPoolId,
-              pendingCacheList.toArray(new Block[] {})));
+          long blockIds[] = new long[pendingCacheList.size()];
+          for (int i = 0; i < pendingCacheList.size(); i++) {
+            blockIds[i] = pendingCacheList.get(i).getBlockId();
+          }
+          cmds.add(new BlockIdCommand(DatanodeProtocol.DNA_CACHE, blockPoolId,
+              blockIds));
         }
         // Check cached block invalidation
         blks = nodeinfo.getInvalidateCacheBlocks();
         if (blks != null) {
-          cmds.add(new BlockCommand(DatanodeProtocol.DNA_UNCACHE,
-              blockPoolId, blks));
+          long blockIds[] = new long[blks.length];
+          for (int i = 0; i < blks.length; i++) {
+            blockIds[i] = blks[i].getBlockId();
+          }
+          cmds.add(new BlockIdCommand(DatanodeProtocol.DNA_UNCACHE,
+              blockPoolId, blockIds));
         }
 
         blockManager.addKeyUpdateCommand(cmds, nodeinfo);

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1532116&r1=1532115&r2=1532116&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
(original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
Mon Oct 14 22:19:10 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -518,6 +519,8 @@ class BPOfferService {
       return true;
     final BlockCommand bcmd = 
       cmd instanceof BlockCommand? (BlockCommand)cmd: null;
+    final BlockIdCommand blockIdCmd = 
+      cmd instanceof BlockIdCommand ? (BlockIdCommand)cmd: null;
 
     switch(cmd.getAction()) {
     case DatanodeProtocol.DNA_TRANSFER:
@@ -545,13 +548,13 @@ class BPOfferService {
       break;
     case DatanodeProtocol.DNA_CACHE:
       LOG.info("DatanodeCommand action: DNA_CACHE");
-      dn.getFSDataset().cache(bcmd.getBlockPoolId(), bcmd.getBlocks());
-      dn.metrics.incrBlocksCached(bcmd.getBlocks().length);
+      dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
+      dn.metrics.incrBlocksCached(blockIdCmd.getBlockIds().length);
       break;
     case DatanodeProtocol.DNA_UNCACHE:
       LOG.info("DatanodeCommand action: DNA_UNCACHE");
-      dn.getFSDataset().uncache(bcmd.getBlockPoolId(), bcmd.getBlocks());
-      dn.metrics.incrBlocksUncached(bcmd.getBlocks().length);
+      dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
+      dn.metrics.incrBlocksUncached(blockIdCmd.getBlockIds().length);
       break;
     case DatanodeProtocol.DNA_SHUTDOWN:
       // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1532116&r1=1532115&r2=1532116&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
(original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
Mon Oct 14 22:19:10 2013
@@ -305,16 +305,16 @@ public interface FsDatasetSpi<V extends 
   /**
    * Caches the specified blocks
    * @param bpid Block pool id
-   * @param cacheBlks - block to cache
+   * @param blockIds - block ids to cache
    */
-  public void cache(String bpid, Block[] cacheBlks);
+  public void cache(String bpid, long[] blockIds);
 
   /**
    * Uncaches the specified blocks
    * @param bpid Block pool id
-   * @param uncacheBlks - blocks to uncache
+   * @param blockIds - blocks ids to uncache
    */
-  public void uncache(String bpid, Block[] uncacheBlks);
+  public void uncache(String bpid, long[] blockIds);
 
     /**
      * Check if all the data directories are healthy

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1532116&r1=1532115&r2=1532116&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
(original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
Mon Oct 14 22:19:10 2013
@@ -91,8 +91,8 @@ public class FsDatasetCache {
   /**
    * @return if the block is cached
    */
-  boolean isCached(String bpid, Block block) {
-    MappableBlock mapBlock = cachedBlocks.get(block.getBlockId());
+  boolean isCached(String bpid, long blockId) {
+    MappableBlock mapBlock = cachedBlocks.get(blockId);
     if (mapBlock != null) {
       return mapBlock.getBlockPoolId().equals(bpid);
     }
@@ -127,7 +127,7 @@ public class FsDatasetCache {
    */
   void cacheBlock(String bpid, Block block, FsVolumeImpl volume,
       FileInputStream blockIn, FileInputStream metaIn) {
-    if (isCached(bpid, block)) {
+    if (isCached(bpid, block.getBlockId())) {
       return;
     }
     MappableBlock mapBlock = null;
@@ -166,23 +166,23 @@ public class FsDatasetCache {
 
   /**
    * Uncaches a block if it is cached.
-   * @param block to uncache
+   * @param blockId id to uncache
    */
-  void uncacheBlock(String bpid, Block block) {
-    MappableBlock mapBlock = cachedBlocks.get(block.getBlockId());
+  void uncacheBlock(String bpid, long blockId) {
+    MappableBlock mapBlock = cachedBlocks.get(blockId);
     if (mapBlock != null &&
         mapBlock.getBlockPoolId().equals(bpid) &&
-        mapBlock.getBlock().equals(block)) {
+        mapBlock.getBlock().getBlockId() == blockId) {
       mapBlock.close();
-      cachedBlocks.remove(block.getBlockId());
+      cachedBlocks.remove(blockId);
       long bytes = mapBlock.getNumBytes();
       long used = usedBytes.get();
       while (!usedBytes.compareAndSet(used, used - bytes)) {
         used = usedBytes.get();
       }
-      LOG.info("Successfully uncached block " + block);
+      LOG.info("Successfully uncached block " + blockId);
     } else {
-      LOG.info("Could not uncache block " + block + ": unknown block.");
+      LOG.info("Could not uncache block " + blockId + ": unknown block.");
     }
   }
 
@@ -215,7 +215,8 @@ public class FsDatasetCache {
       // If we failed or the block became uncacheable in the meantime,
       // clean up and return the reserved cache allocation 
       if (!success || 
-          !dataset.validToCache(block.getBlockPoolId(), block.getBlock())) {
+          !dataset.validToCache(block.getBlockPoolId(),
+              block.getBlock().getBlockId())) {
         block.close();
         long used = usedBytes.get();
         while (!usedBytes.compareAndSet(used, used-block.getNumBytes())) {

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1532116&r1=1532115&r2=1532116&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
(original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
Mon Oct 14 22:19:10 2013
@@ -562,7 +562,7 @@ class FsDatasetImpl implements FsDataset
       FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
       throws IOException {
     // uncache the block
-    cacheManager.uncacheBlock(bpid, replicaInfo);
+    cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
     // unlink the finalized replica
     replicaInfo.unlinkBlock(1);
     
@@ -1178,7 +1178,7 @@ class FsDatasetImpl implements FsDataset
       }
 
       // Uncache the block synchronously
-      cacheManager.uncacheBlock(bpid, invalidBlks[i]);
+      cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
       // Delete the block asynchronously to make sure we can do it fast enough
       asyncDiskService.deleteAsync(v, f,
           FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
@@ -1189,20 +1189,22 @@ class FsDatasetImpl implements FsDataset
     }
   }
 
-  synchronized boolean validToCache(String bpid, Block blk) {
-    ReplicaInfo info = volumeMap.get(bpid, blk);
+  synchronized boolean validToCache(String bpid, long blockId) {
+    ReplicaInfo info = volumeMap.get(bpid, blockId);
     if (info == null) {
-      LOG.warn("Failed to cache replica " + blk + ": ReplicaInfo not found.");
+      LOG.warn("Failed to cache replica in block pool " + bpid +
+          " with block id " + blockId + ": ReplicaInfo not found.");
       return false;
     }
     FsVolumeImpl volume = (FsVolumeImpl)info.getVolume();
     if (volume == null) {
-      LOG.warn("Failed to cache replica " + blk + ": Volume not found.");
+      LOG.warn("Failed to cache block with id " + blockId +
+          ": Volume not found.");
       return false;
     }
     if (info.getState() != ReplicaState.FINALIZED) {
-      LOG.warn("Failed to cache replica " + blk + ": Replica is not"
-          + " finalized.");
+      LOG.warn("Failed to block with id " + blockId + 
+          ": Replica is not finalized.");
       return false;
     }
     return true;
@@ -1211,31 +1213,33 @@ class FsDatasetImpl implements FsDataset
   /**
    * Asynchronously attempts to cache a single block via {@link FsDatasetCache}.
    */
-  private void cacheBlock(String bpid, Block blk) {
+  private void cacheBlock(String bpid, long blockId) {
     ReplicaInfo info;
     FsVolumeImpl volume;
     synchronized (this) {
-      if (!validToCache(bpid, blk)) {
+      if (!validToCache(bpid, blockId)) {
         return;
       }
-      info = volumeMap.get(bpid, blk);
+      info = volumeMap.get(bpid, blockId);
       volume = (FsVolumeImpl)info.getVolume();
     }
     // Try to open block and meta streams
     FileInputStream blockIn = null;
     FileInputStream metaIn = null;
     boolean success = false;
+    ExtendedBlock extBlk =
+        new ExtendedBlock(bpid, blockId,
+            info.getBytesOnDisk(), info.getGenerationStamp());
     try {
-      ExtendedBlock extBlk = new ExtendedBlock(bpid, blk);
       blockIn = (FileInputStream)getBlockInputStream(extBlk, 0);
       metaIn = (FileInputStream)getMetaDataInputStream(extBlk)
           .getWrappedStream();
       success = true;
     } catch (ClassCastException e) {
-      LOG.warn("Failed to cache replica " + blk + ": Underlying blocks"
+      LOG.warn("Failed to cache replica " + extBlk + ": Underlying blocks"
           + " are not backed by files.", e);
     } catch (IOException e) {
-      LOG.warn("Failed to cache replica " + blk + ": IOException while"
+      LOG.warn("Failed to cache replica " + extBlk + ": IOException while"
           + " trying to open block or meta files.", e);
     }
     if (!success) {
@@ -1243,21 +1247,21 @@ class FsDatasetImpl implements FsDataset
       IOUtils.closeQuietly(metaIn);
       return;
     }
-    cacheManager.cacheBlock(bpid, blk, volume, blockIn, metaIn);
+    cacheManager.cacheBlock(bpid, extBlk.getLocalBlock(),
+        volume, blockIn, metaIn);
   }
 
   @Override // FsDatasetSpi
-  public void cache(String bpid, Block[] cacheBlks) {
-    for (int i=0; i<cacheBlks.length; i++) {
-      cacheBlock(bpid, cacheBlks[i]);
+  public void cache(String bpid, long[] blockIds) {
+    for (int i=0; i < blockIds.length; i++) {
+      cacheBlock(bpid, blockIds[i]);
     }
   }
 
   @Override // FsDatasetSpi
-  public void uncache(String bpid, Block[] uncacheBlks) {
-    for (int i=0; i<uncacheBlks.length; i++) {
-      Block blk = uncacheBlks[i];
-      cacheManager.uncacheBlock(bpid, blk);
+  public void uncache(String bpid, long[] blockIds) {
+    for (int i=0; i < blockIds.length; i++) {
+      cacheManager.uncacheBlock(bpid, blockIds[i]);
     }
   }
 

Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java?rev=1532116&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java
(added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java
Mon Oct 14 22:19:10 2013
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/****************************************************
+ * A BlockIdCommand is an instruction to a datanode 
+ * regarding some blocks under its control.
+ ****************************************************/
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class BlockIdCommand extends DatanodeCommand {
+  final String poolId;
+  final long blockIds[];
+
+  /**
+   * Create BlockCommand for the given action
+   * @param blocks blocks related to the action
+   */
+  public BlockIdCommand(int action, String poolId, long[] blockIds) {
+    super(action);
+    this.poolId = poolId;
+    this.blockIds= blockIds;
+  }
+  
+  public String getBlockPoolId() {
+    return poolId;
+  }
+  
+  public long[] getBlockIds() {
+    return blockIds;
+  }
+}

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto?rev=1532116&r1=1532115&r2=1532116&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
(original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
Mon Oct 14 22:19:10 2013
@@ -70,6 +70,7 @@ message DatanodeCommandProto {
     RegisterCommand = 5;
     UnusedUpgradeCommand = 6;
     NullDatanodeCommand = 7;
+    BlockIdCommand = 8;
   }
 
   required Type cmdType = 1;    // Type of the command
@@ -82,6 +83,7 @@ message DatanodeCommandProto {
   optional FinalizeCommandProto finalizeCmd = 5;
   optional KeyUpdateCommandProto keyUpdateCmd = 6;
   optional RegisterCommandProto registerCmd = 7;
+  optional BlockIdCommandProto blkIdCmd = 8;
 }
 
 /**
@@ -103,8 +105,6 @@ message BlockCommandProto {
     TRANSFER = 1;   // Transfer blocks to another datanode
     INVALIDATE = 2; // Invalidate blocks
     SHUTDOWN = 3;   // Shutdown the datanode
-    CACHE = 4;      // Cache blocks on the datanode
-    UNCACHE = 5;    // Uncache blocks on the datanode
   }
   required Action action = 1;
   required string blockPoolId = 2;
@@ -113,6 +113,20 @@ message BlockCommandProto {
 }
 
 /**
+ * Command to instruct datanodes to perform certain action
+ * on the given set of block IDs.
+ */
+message BlockIdCommandProto {
+  enum Action {
+    CACHE = 1;
+    UNCACHE = 2;
+  }
+  required Action action = 1;
+  required string blockPoolId = 2;
+  repeated uint64 blockIds = 3 [packed=true];
+}
+
+/**
  * List of blocks to be recovered by the datanode
  */
 message BlockRecoveryCommandProto {

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1532116&r1=1532115&r2=1532116&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
(original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Mon Oct 14 22:19:10 2013
@@ -580,13 +580,13 @@ public class SimulatedFSDataset implemen
   }
 
   @Override // FSDatasetSpi
-  public void cache(String bpid, Block[] cacheBlks) {
+  public void cache(String bpid, long[] cacheBlks) {
     throw new UnsupportedOperationException(
         "SimulatedFSDataset does not support cache operation!");
   }
 
   @Override // FSDatasetSpi
-  public void uncache(String bpid, Block[] uncacheBlks) {
+  public void uncache(String bpid, long[] uncacheBlks) {
     throw new UnsupportedOperationException(
         "SimulatedFSDataset does not support uncache operation!");
   }



Mime
View raw message