hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject svn commit: r1560522 [3/9] - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/bin/ src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/m...
Date Wed, 22 Jan 2014 21:43:04 GMT
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Wed Jan 22 21:43:00 2014
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.protocolPB;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
@@ -26,18 +28,26 @@ import java.util.EnumSet;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolStats;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
@@ -52,12 +62,21 @@ import org.apache.hadoop.hdfs.protocol.S
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveStatsProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheFlagProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolStatsProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
@@ -121,6 +140,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@@ -151,7 +171,9 @@ import org.apache.hadoop.security.proto.
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.primitives.Shorts;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedInputStream;
 
@@ -482,27 +504,14 @@ public class PBHelper {
         PBHelper.convert(di.getId()),
         di.hasLocation() ? di.getLocation() : null , 
         di.getCapacity(),  di.getDfsUsed(),  di.getRemaining(),
-        di.getBlockPoolUsed()  ,  di.getLastUpdate() , di.getXceiverCount() ,
+        di.getBlockPoolUsed(), di.getCacheCapacity(), di.getCacheUsed(),
+        di.getLastUpdate(), di.getXceiverCount(),
         PBHelper.convert(di.getAdminState())); 
   }
   
   static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
     if (di == null) return null;
-    DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
-    if (di.getNetworkLocation() != null) {
-      builder.setLocation(di.getNetworkLocation());
-    }
-        
-    return builder.
-     setId(PBHelper.convert((DatanodeID) di)).
-     setCapacity(di.getCapacity()).
-     setDfsUsed(di.getDfsUsed()).
-     setRemaining(di.getRemaining()).
-     setBlockPoolUsed(di.getBlockPoolUsed()).
-     setLastUpdate(di.getLastUpdate()).
-     setXceiverCount(di.getXceiverCount()).
-     setAdminState(PBHelper.convert(di.getAdminState())).
-     build();     
+    return convert(di);
   }
   
   
@@ -546,15 +555,20 @@ public class PBHelper {
   
   public static DatanodeInfoProto convert(DatanodeInfo info) {
     DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
-    builder.setBlockPoolUsed(info.getBlockPoolUsed());
-    builder.setAdminState(PBHelper.convert(info.getAdminState()));
-    builder.setCapacity(info.getCapacity())
-        .setDfsUsed(info.getDfsUsed())
+    if (info.getNetworkLocation() != null) {
+      builder.setLocation(info.getNetworkLocation());
+    }
+    builder
         .setId(PBHelper.convert((DatanodeID)info))
-        .setLastUpdate(info.getLastUpdate())
-        .setLocation(info.getNetworkLocation())
+        .setCapacity(info.getCapacity())
+        .setDfsUsed(info.getDfsUsed())
         .setRemaining(info.getRemaining())
+        .setBlockPoolUsed(info.getBlockPoolUsed())
+        .setCacheCapacity(info.getCacheCapacity())
+        .setCacheUsed(info.getCacheUsed())
+        .setLastUpdate(info.getLastUpdate())
         .setXceiverCount(info.getXceiverCount())
+        .setAdminState(PBHelper.convert(info.getAdminState()))
         .build();
     return builder.build();
   }
@@ -575,9 +589,20 @@ public class PBHelper {
     if (b == null) return null;
     Builder builder = LocatedBlockProto.newBuilder();
     DatanodeInfo[] locs = b.getLocations();
+    List<DatanodeInfo> cachedLocs =
+        Lists.newLinkedList(Arrays.asList(b.getCachedLocations()));
     for (int i = 0; i < locs.length; i++) {
-      builder.addLocs(i, PBHelper.convert(locs[i]));
+      DatanodeInfo loc = locs[i];
+      builder.addLocs(i, PBHelper.convert(loc));
+      boolean locIsCached = cachedLocs.contains(loc);
+      builder.addIsCached(locIsCached);
+      if (locIsCached) {
+        cachedLocs.remove(loc);
+      }
     }
+    Preconditions.checkArgument(cachedLocs.size() == 0,
+        "Found additional cached replica locations that are not in the set of"
+        + " storage-backed locations!");
 
     StorageType[] storageTypes = b.getStorageTypes();
     if (storageTypes != null) {
@@ -621,9 +646,20 @@ public class PBHelper {
       storageIDs = proto.getStorageIDsList().toArray(new String[storageIDsCount]);
     }
 
+    // Set values from the isCached list, re-using references from loc
+    List<DatanodeInfo> cachedLocs = new ArrayList<DatanodeInfo>(locs.size());
+    List<Boolean> isCachedList = proto.getIsCachedList();
+    for (int i=0; i<isCachedList.size(); i++) {
+      if (isCachedList.get(i)) {
+        cachedLocs.add(targets[i]);
+      }
+    }
+
     LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets,
-        storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt());
+        storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(),
+        cachedLocs.toArray(new DatanodeInfo[0]));
     lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
+
     return lb;
   }
 
@@ -712,6 +748,8 @@ public class PBHelper {
       return PBHelper.convert(proto.getKeyUpdateCmd());
     case RegisterCommand:
       return REG_CMD;
+    case BlockIdCommand:
+      return PBHelper.convert(proto.getBlkIdCmd());
     }
     return null;
   }
@@ -765,6 +803,26 @@ public class PBHelper {
            .addAllTargetStorageUuids(convert(cmd.getTargetStorageIDs()));
     return builder.build();
   }
+  
+  public static BlockIdCommandProto convert(BlockIdCommand cmd) {
+    BlockIdCommandProto.Builder builder = BlockIdCommandProto.newBuilder()
+        .setBlockPoolId(cmd.getBlockPoolId());
+    switch (cmd.getAction()) {
+    case DatanodeProtocol.DNA_CACHE:
+      builder.setAction(BlockIdCommandProto.Action.CACHE);
+      break;
+    case DatanodeProtocol.DNA_UNCACHE:
+      builder.setAction(BlockIdCommandProto.Action.UNCACHE);
+      break;
+    default:
+      throw new AssertionError("Invalid action");
+    }
+    long[] blockIds = cmd.getBlockIds();
+    for (int i = 0; i < blockIds.length; i++) {
+      builder.addBlockIds(blockIds[i]);
+    }
+    return builder.build();
+  }
 
   private static List<DatanodeInfosProto> convert(DatanodeInfo[][] targets) {
     DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length];
@@ -817,8 +875,13 @@ public class PBHelper {
     case DatanodeProtocol.DNA_TRANSFER:
     case DatanodeProtocol.DNA_INVALIDATE:
     case DatanodeProtocol.DNA_SHUTDOWN:
-      builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd(
-          PBHelper.convert((BlockCommand) datanodeCommand));
+      builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).
+        setBlkCmd(PBHelper.convert((BlockCommand) datanodeCommand));
+      break;
+    case DatanodeProtocol.DNA_CACHE:
+    case DatanodeProtocol.DNA_UNCACHE:
+      builder.setCmdType(DatanodeCommandProto.Type.BlockIdCommand).
+        setBlkIdCmd(PBHelper.convert((BlockIdCommand) datanodeCommand));
       break;
     case DatanodeProtocol.DNA_UNKNOWN: //Not expected
     default:
@@ -877,11 +940,33 @@ public class PBHelper {
     case SHUTDOWN:
       action = DatanodeProtocol.DNA_SHUTDOWN;
       break;
+    default:
+      throw new AssertionError("Unknown action type: " + blkCmd.getAction());
     }
     return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets,
         targetStorageIDs);
   }
 
+  public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) {
+    int numBlockIds = blkIdCmd.getBlockIdsCount();
+    long blockIds[] = new long[numBlockIds];
+    for (int i = 0; i < numBlockIds; i++) {
+      blockIds[i] = blkIdCmd.getBlockIds(i);
+    }
+    int action = DatanodeProtocol.DNA_UNKNOWN;
+    switch (blkIdCmd.getAction()) {
+    case CACHE:
+      action = DatanodeProtocol.DNA_CACHE;
+      break;
+    case UNCACHE:
+      action = DatanodeProtocol.DNA_UNCACHE;
+      break;
+    default:
+      throw new AssertionError("Unknown action type: " + blkIdCmd.getAction());
+    }
+    return new BlockIdCommand(action, blkIdCmd.getBlockPoolId(), blockIds);
+  }
+
   public static DatanodeInfo[] convert(DatanodeInfosProto datanodeInfosProto) {
     List<DatanodeInfoProto> proto = datanodeInfosProto.getDatanodesList();
     DatanodeInfo[] infos = new DatanodeInfo[proto.size()];
@@ -1090,7 +1175,7 @@ public class PBHelper {
     return value;
   }
   
-  public static EnumSetWritable<CreateFlag> convert(int flag) {
+  public static EnumSetWritable<CreateFlag> convertCreateFlag(int flag) {
     EnumSet<CreateFlag> result = 
        EnumSet.noneOf(CreateFlag.class);   
     if ((flag & CreateFlagProto.APPEND_VALUE) == CreateFlagProto.APPEND_VALUE) {
@@ -1105,7 +1190,23 @@ public class PBHelper {
     }
     return new EnumSetWritable<CreateFlag>(result);
   }
-  
+
+  public static int convertCacheFlags(EnumSet<CacheFlag> flags) {
+    int value = 0;
+    if (flags.contains(CacheFlag.FORCE)) {
+      value |= CacheFlagProto.FORCE.getNumber();
+    }
+    return value;
+  }
+
+  public static EnumSet<CacheFlag> convertCacheFlags(int flags) {
+    EnumSet<CacheFlag> result = EnumSet.noneOf(CacheFlag.class);
+    if ((flags & CacheFlagProto.FORCE_VALUE) == CacheFlagProto.FORCE_VALUE) {
+      result.add(CacheFlag.FORCE);
+    }
+    return result;
+  }
+
   public static HdfsFileStatus convert(HdfsFileStatusProto fs) {
     if (fs == null)
       return null;
@@ -1455,11 +1556,12 @@ public class PBHelper {
   }
 
   public static StorageReportProto convert(StorageReport r) {
-    return StorageReportProto.newBuilder()
+    StorageReportProto.Builder builder = StorageReportProto.newBuilder()
         .setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity())
         .setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
         .setStorageUuid(r.getStorage().getStorageID())
-        .setStorage(convert(r.getStorage())).build();
+        .setStorage(convert(r.getStorage()));
+    return builder.build();
   }
 
   public static StorageReport convert(StorageReportProto p) {
@@ -1598,6 +1700,178 @@ public class PBHelper {
     return DataChecksum.Type.valueOf(type.getNumber());
   }
 
+  public static CacheDirectiveInfoProto convert
+      (CacheDirectiveInfo info) {
+    CacheDirectiveInfoProto.Builder builder = 
+        CacheDirectiveInfoProto.newBuilder();
+    if (info.getId() != null) {
+      builder.setId(info.getId());
+    }
+    if (info.getPath() != null) {
+      builder.setPath(info.getPath().toUri().getPath());
+    }
+    if (info.getReplication() != null) {
+      builder.setReplication(info.getReplication());
+    }
+    if (info.getPool() != null) {
+      builder.setPool(info.getPool());
+    }
+    if (info.getExpiration() != null) {
+      builder.setExpiration(convert(info.getExpiration()));
+    }
+    return builder.build();
+  }
+
+  public static CacheDirectiveInfo convert
+      (CacheDirectiveInfoProto proto) {
+    CacheDirectiveInfo.Builder builder =
+        new CacheDirectiveInfo.Builder();
+    if (proto.hasId()) {
+      builder.setId(proto.getId());
+    }
+    if (proto.hasPath()) {
+      builder.setPath(new Path(proto.getPath()));
+    }
+    if (proto.hasReplication()) {
+      builder.setReplication(Shorts.checkedCast(
+          proto.getReplication()));
+    }
+    if (proto.hasPool()) {
+      builder.setPool(proto.getPool());
+    }
+    if (proto.hasExpiration()) {
+      builder.setExpiration(convert(proto.getExpiration()));
+    }
+    return builder.build();
+  }
+
+  public static CacheDirectiveInfoExpirationProto convert(
+      CacheDirectiveInfo.Expiration expiration) {
+    return CacheDirectiveInfoExpirationProto.newBuilder()
+        .setIsRelative(expiration.isRelative())
+        .setMillis(expiration.getMillis())
+        .build();
+  }
+
+  public static CacheDirectiveInfo.Expiration convert(
+      CacheDirectiveInfoExpirationProto proto) {
+    if (proto.getIsRelative()) {
+      return CacheDirectiveInfo.Expiration.newRelative(proto.getMillis());
+    }
+    return CacheDirectiveInfo.Expiration.newAbsolute(proto.getMillis());
+  }
+
+  public static CacheDirectiveStatsProto convert(CacheDirectiveStats stats) {
+    CacheDirectiveStatsProto.Builder builder = 
+        CacheDirectiveStatsProto.newBuilder();
+    builder.setBytesNeeded(stats.getBytesNeeded());
+    builder.setBytesCached(stats.getBytesCached());
+    builder.setFilesNeeded(stats.getFilesNeeded());
+    builder.setFilesCached(stats.getFilesCached());
+    builder.setHasExpired(stats.hasExpired());
+    return builder.build();
+  }
+  
+  public static CacheDirectiveStats convert(CacheDirectiveStatsProto proto) {
+    CacheDirectiveStats.Builder builder = new CacheDirectiveStats.Builder();
+    builder.setBytesNeeded(proto.getBytesNeeded());
+    builder.setBytesCached(proto.getBytesCached());
+    builder.setFilesNeeded(proto.getFilesNeeded());
+    builder.setFilesCached(proto.getFilesCached());
+    builder.setHasExpired(proto.getHasExpired());
+    return builder.build();
+  }
+
+  public static CacheDirectiveEntryProto convert(CacheDirectiveEntry entry) {
+    CacheDirectiveEntryProto.Builder builder = 
+        CacheDirectiveEntryProto.newBuilder();
+    builder.setInfo(PBHelper.convert(entry.getInfo()));
+    builder.setStats(PBHelper.convert(entry.getStats()));
+    return builder.build();
+  }
+  
+  public static CacheDirectiveEntry convert(CacheDirectiveEntryProto proto) {
+    CacheDirectiveInfo info = PBHelper.convert(proto.getInfo());
+    CacheDirectiveStats stats = PBHelper.convert(proto.getStats());
+    return new CacheDirectiveEntry(info, stats);
+  }
+
+  public static CachePoolInfoProto convert(CachePoolInfo info) {
+    CachePoolInfoProto.Builder builder = CachePoolInfoProto.newBuilder();
+    builder.setPoolName(info.getPoolName());
+    if (info.getOwnerName() != null) {
+      builder.setOwnerName(info.getOwnerName());
+    }
+    if (info.getGroupName() != null) {
+      builder.setGroupName(info.getGroupName());
+    }
+    if (info.getMode() != null) {
+      builder.setMode(info.getMode().toShort());
+    }
+    if (info.getLimit() != null) {
+      builder.setLimit(info.getLimit());
+    }
+    if (info.getMaxRelativeExpiryMs() != null) {
+      builder.setMaxRelativeExpiry(info.getMaxRelativeExpiryMs());
+    }
+    return builder.build();
+  }
+
+  public static CachePoolInfo convert (CachePoolInfoProto proto) {
+    // Pool name is a required field, the rest are optional
+    String poolName = checkNotNull(proto.getPoolName());
+    CachePoolInfo info = new CachePoolInfo(poolName);
+    if (proto.hasOwnerName()) {
+        info.setOwnerName(proto.getOwnerName());
+    }
+    if (proto.hasGroupName()) {
+      info.setGroupName(proto.getGroupName());
+    }
+    if (proto.hasMode()) {
+      info.setMode(new FsPermission((short)proto.getMode()));
+    }
+    if (proto.hasLimit())  {
+      info.setLimit(proto.getLimit());
+    }
+    if (proto.hasMaxRelativeExpiry()) {
+      info.setMaxRelativeExpiryMs(proto.getMaxRelativeExpiry());
+    }
+    return info;
+  }
+
+  public static CachePoolStatsProto convert(CachePoolStats stats) {
+    CachePoolStatsProto.Builder builder = CachePoolStatsProto.newBuilder();
+    builder.setBytesNeeded(stats.getBytesNeeded());
+    builder.setBytesCached(stats.getBytesCached());
+    builder.setBytesOverlimit(stats.getBytesOverlimit());
+    builder.setFilesNeeded(stats.getFilesNeeded());
+    builder.setFilesCached(stats.getFilesCached());
+    return builder.build();
+  }
+
+  public static CachePoolStats convert (CachePoolStatsProto proto) {
+    CachePoolStats.Builder builder = new CachePoolStats.Builder();
+    builder.setBytesNeeded(proto.getBytesNeeded());
+    builder.setBytesCached(proto.getBytesCached());
+    builder.setBytesOverlimit(proto.getBytesOverlimit());
+    builder.setFilesNeeded(proto.getFilesNeeded());
+    builder.setFilesCached(proto.getFilesCached());
+    return builder.build();
+  }
+
+  public static CachePoolEntryProto convert(CachePoolEntry entry) {
+    CachePoolEntryProto.Builder builder = CachePoolEntryProto.newBuilder();
+    builder.setInfo(PBHelper.convert(entry.getInfo()));
+    builder.setStats(PBHelper.convert(entry.getStats()));
+    return builder.build();
+  }
+
+  public static CachePoolEntry convert (CachePoolEntryProto proto) {
+    CachePoolInfo info = PBHelper.convert(proto.getInfo());
+    CachePoolStats stats = PBHelper.convert(proto.getStats());
+    return new CachePoolEntry(info, stats);
+  }
+  
   public static HdfsProtos.ChecksumTypeProto convert(DataChecksum.Type type) {
     return HdfsProtos.ChecksumTypeProto.valueOf(type.id);
   }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Wed Jan 22 21:43:00 2014
@@ -158,7 +158,7 @@ public class BlockInfo extends Block imp
     return info;
   }
 
-  int getCapacity() {
+  public int getCapacity() {
     assert this.triplets != null : "BlockInfo is not initialized";
     assert triplets.length % 3 == 0 : "Malformed BlockInfo";
     return triplets.length / 3;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Wed Jan 22 21:43:00 2014
@@ -3180,6 +3180,13 @@ assert storedBlock.findDatanode(dn) < 0 
         UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
   }
 
+  /**
+   * Get the replicas which are corrupt for a given block.
+   */
+  public Collection<DatanodeDescriptor> getCorruptReplicas(Block block) {
+    return corruptReplicas.getNodes(block);
+  }
+
   /** @return the size of UnderReplicatedBlocks */
   public int numOfUnderReplicatedBlocks() {
     return neededReplications.size();

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1560522&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Wed Jan 22 21:43:00 2014
@@ -0,0 +1,774 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.apache.hadoop.util.ExitUtil.terminate;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CacheDirective;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.namenode.CacheManager;
+import org.apache.hadoop.hdfs.server.namenode.CachePool;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
+import org.apache.hadoop.util.GSet;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Scans the namesystem, scheduling blocks to be cached as appropriate.
+ *
+ * The CacheReplicationMonitor does a full scan when the NameNode first
+ * starts up, and at configurable intervals afterwards.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+public class CacheReplicationMonitor extends Thread implements Closeable {
+
+  private static final Log LOG =
+      LogFactory.getLog(CacheReplicationMonitor.class);
+
+  private final FSNamesystem namesystem;
+
+  private final BlockManager blockManager;
+
+  private final CacheManager cacheManager;
+
+  private final GSet<CachedBlock, CachedBlock> cachedBlocks;
+
+  /**
+   * Pseudorandom number source
+   */
+  private static final Random random = new Random();
+
+  /**
+   * The interval at which we scan the namesystem for caching changes.
+   */
+  private final long intervalMs;
+
+  /**
+   * The CacheReplicationMonitor (CRM) lock. Used to synchronize starting and
+   * waiting for rescan operations.
+   */
+  private final ReentrantLock lock;
+
+  /**
+   * Notifies the scan thread that an immediate rescan is needed.
+   */
+  private final Condition doRescan;
+
+  /**
+   * Notifies waiting threads that a rescan has finished.
+   */
+  private final Condition scanFinished;
+
+  /**
+   * Whether there are pending CacheManager operations that necessitate a
+   * CacheReplicationMonitor rescan. Protected by the CRM lock.
+   */
+  private boolean needsRescan = true;
+
+  /**
+   * Whether we are currently doing a rescan. Protected by the CRM lock.
+   */
+  private boolean isScanning = false;
+
+  /**
+   * The number of rescans completed. Used to wait for scans to finish.
+   * Protected by the CacheReplicationMonitor lock.
+   */
+  private long scanCount = 0;
+
+  /**
+   * True if this monitor should terminate. Protected by the CRM lock.
+   */
+  private boolean shutdown = false;
+
+  /**
+   * Mark status of the current scan.
+   */
+  private boolean mark = false;
+
+  /**
+   * Cache directives found in the previous scan.
+   */
+  private int scannedDirectives;
+
+  /**
+   * Blocks found in the previous scan.
+   */
+  private long scannedBlocks;
+
+  public CacheReplicationMonitor(FSNamesystem namesystem,
+      CacheManager cacheManager, long intervalMs, ReentrantLock lock) {
+    this.namesystem = namesystem;
+    this.blockManager = namesystem.getBlockManager();
+    this.cacheManager = cacheManager;
+    this.cachedBlocks = cacheManager.getCachedBlocks();
+    this.intervalMs = intervalMs;
+    this.lock = lock;
+    this.doRescan = this.lock.newCondition();
+    this.scanFinished = this.lock.newCondition();
+  }
+
+  @Override
+  public void run() {
+    long startTimeMs = 0;
+    Thread.currentThread().setName("CacheReplicationMonitor(" +
+        System.identityHashCode(this) + ")");
+    LOG.info("Starting CacheReplicationMonitor with interval " +
+             intervalMs + " milliseconds");
+    try {
+      long curTimeMs = Time.monotonicNow();
+      while (true) {
+        lock.lock();
+        try {
+          while (true) {
+            if (shutdown) {
+              LOG.info("Shutting down CacheReplicationMonitor");
+              return;
+            }
+            if (needsRescan) {
+              LOG.info("Rescanning because of pending operations");
+              break;
+            }
+            long delta = (startTimeMs + intervalMs) - curTimeMs;
+            if (delta <= 0) {
+              LOG.info("Rescanning after " + (curTimeMs - startTimeMs) +
+                  " milliseconds");
+              break;
+            }
+            doRescan.await(delta, TimeUnit.MILLISECONDS);
+            curTimeMs = Time.monotonicNow();
+          }
+          isScanning = true;
+          needsRescan = false;
+        } finally {
+          lock.unlock();
+        }
+        startTimeMs = curTimeMs;
+        mark = !mark;
+        rescan();
+        curTimeMs = Time.monotonicNow();
+        // Update synchronization-related variables.
+        lock.lock();
+        try {
+          isScanning = false;
+          scanCount++;
+          scanFinished.signalAll();
+        } finally {
+          lock.unlock();
+        }
+        LOG.info("Scanned " + scannedDirectives + " directive(s) and " +
+            scannedBlocks + " block(s) in " + (curTimeMs - startTimeMs) + " " +
+            "millisecond(s).");
+      }
+    } catch (InterruptedException e) {
+      LOG.info("Shutting down CacheReplicationMonitor.");
+      return;
+    } catch (Throwable t) {
+      LOG.fatal("Thread exiting", t);
+      terminate(1, t);
+    }
+  }
+
+  /**
+   * Waits for a rescan to complete. This doesn't guarantee consistency with
+   * pending operations, only relative recency, since it will not force a new
+   * rescan if a rescan is already underway.
+   * <p>
+   * Note that this call will release the FSN lock, so operations before and
+   * after are not atomic.
+   */
+  public void waitForRescanIfNeeded() {
+    Preconditions.checkArgument(!namesystem.hasWriteLock(),
+        "Must not hold the FSN write lock when waiting for a rescan.");
+    Preconditions.checkArgument(lock.isHeldByCurrentThread(),
+        "Must hold the CRM lock when waiting for a rescan.");
+    if (!needsRescan) {
+      return;
+    }
+    // If no scan is already ongoing, mark the CRM as dirty and kick
+    if (!isScanning) {
+      doRescan.signal();
+    }
+    // Wait until the scan finishes and the count advances
+    final long startCount = scanCount;
+    while ((!shutdown) && (startCount >= scanCount)) {
+      try {
+        scanFinished.await();
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for CacheReplicationMonitor"
+            + " rescan", e);
+        break;
+      }
+    }
+  }
+
+  /**
+   * Indicates to the CacheReplicationMonitor that there have been CacheManager
+   * changes that require a rescan.
+   */
+  public void setNeedsRescan() {
+    Preconditions.checkArgument(lock.isHeldByCurrentThread(),
+        "Must hold the CRM lock when setting the needsRescan bit.");
+    this.needsRescan = true;
+  }
+
+  /**
+   * Shut down the monitor thread.
+   */
+  @Override
+  public void close() throws IOException {
+    Preconditions.checkArgument(namesystem.hasWriteLock());
+    lock.lock();
+    try {
+      if (shutdown) return;
+      // Since we hold both the FSN write lock and the CRM lock here,
+      // we know that the CRM thread cannot be currently modifying
+      // the cache manager state while we're closing it.
+      // Since the CRM thread checks the value of 'shutdown' after waiting
+      // for a lock, we know that the thread will not modify the cache
+      // manager state after this point.
+      shutdown = true;
+      doRescan.signalAll();
+      scanFinished.signalAll();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private void rescan() throws InterruptedException {
+    scannedDirectives = 0;
+    scannedBlocks = 0;
+    namesystem.writeLock();
+    try {
+      if (shutdown) {
+        throw new InterruptedException("CacheReplicationMonitor was " +
+            "shut down.");
+      }
+      resetStatistics();
+      rescanCacheDirectives();
+      rescanCachedBlockMap();
+      blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
+    } finally {
+      namesystem.writeUnlock();
+    }
+  }
+
+  private void resetStatistics() {
+    for (CachePool pool: cacheManager.getCachePools()) {
+      pool.resetStatistics();
+    }
+    for (CacheDirective directive: cacheManager.getCacheDirectives()) {
+      directive.resetStatistics();
+    }
+  }
+
+  /**
+   * Scan all CacheDirectives.  Use the information to figure out
+   * what cache replication factor each block should have.
+   */
+  private void rescanCacheDirectives() {
+    FSDirectory fsDir = namesystem.getFSDirectory();
+    final long now = new Date().getTime();
+    for (CacheDirective directive : cacheManager.getCacheDirectives()) {
+      // Skip processing this entry if it has expired
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Directive expiry is at " + directive.getExpiryTime());
+      }
+      if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skipping directive id " + directive.getId()
+              + " because it has expired (" + directive.getExpiryTime() + "<="
+              + now + ")");
+        }
+        continue;
+      }
+      scannedDirectives++;
+      String path = directive.getPath();
+      INode node;
+      try {
+        node = fsDir.getINode(path);
+      } catch (UnresolvedLinkException e) {
+        // We don't cache through symlinks
+        continue;
+      }
+      if (node == null)  {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("No inode found at " + path);
+        }
+      } else if (node.isDirectory()) {
+        INodeDirectory dir = node.asDirectory();
+        ReadOnlyList<INode> children = dir.getChildrenList(null);
+        for (INode child : children) {
+          if (child.isFile()) {
+            rescanFile(directive, child.asFile());
+          }
+        }
+      } else if (node.isFile()) {
+        rescanFile(directive, node.asFile());
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Ignoring non-directory, non-file inode " + node +
+                    " found at " + path);
+        }
+      }
+    }
+  }
+  
+  /**
+   * Apply a CacheDirective to a file.
+   * 
+   * @param directive The CacheDirective to apply.
+   * @param file The file.
+   */
+  private void rescanFile(CacheDirective directive, INodeFile file) {
+    BlockInfo[] blockInfos = file.getBlocks();
+
+    // Increment the "needed" statistics
+    directive.addFilesNeeded(1);
+    // We don't cache UC blocks, don't add them to the total here
+    long neededTotal = file.computeFileSizeNotIncludingLastUcBlock() *
+        directive.getReplication();
+    directive.addBytesNeeded(neededTotal);
+
+    // The pool's bytesNeeded is incremented as we scan. If the demand
+    // thus far plus the demand of this file would exceed the pool's limit,
+    // do not cache this file.
+    CachePool pool = directive.getPool();
+    if (pool.getBytesNeeded() > pool.getLimit()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("Skipping directive id %d file %s because "
+            + "limit of pool %s would be exceeded (%d > %d)",
+            directive.getId(),
+            file.getFullPathName(),
+            pool.getPoolName(),
+            pool.getBytesNeeded(),
+            pool.getLimit()));
+      }
+      return;
+    }
+
+    long cachedTotal = 0;
+    for (BlockInfo blockInfo : blockInfos) {
+      if (!blockInfo.getBlockUCState().equals(BlockUCState.COMPLETE)) {
+        // We don't try to cache blocks that are under construction.
+        continue;
+      }
+      Block block = new Block(blockInfo.getBlockId());
+      CachedBlock ncblock = new CachedBlock(block.getBlockId(),
+          directive.getReplication(), mark);
+      CachedBlock ocblock = cachedBlocks.get(ncblock);
+      if (ocblock == null) {
+        cachedBlocks.put(ncblock);
+      } else {
+        // Update bytesUsed using the current replication levels.
+        // Assumptions: we assume that all the blocks are the same length
+        // on each datanode.  We can assume this because we're only caching
+        // blocks in state COMMITTED.
+        // Note that if two directives are caching the same block(s), they will
+        // both get them added to their bytesCached.
+        List<DatanodeDescriptor> cachedOn =
+            ocblock.getDatanodes(Type.CACHED);
+        long cachedByBlock = Math.min(cachedOn.size(),
+            directive.getReplication()) * blockInfo.getNumBytes();
+        cachedTotal += cachedByBlock;
+
+        if ((mark != ocblock.getMark()) ||
+            (ocblock.getReplication() < directive.getReplication())) {
+          //
+          // Overwrite the block's replication and mark in two cases:
+          //
+          // 1. If the mark on the CachedBlock is different from the mark for
+          // this scan, that means the block hasn't been updated during this
+          // scan, and we should overwrite whatever is there, since it is no
+          // longer valid.
+          //
+          // 2. If the replication in the CachedBlock is less than what the
+          // directive asks for, we want to increase the block's replication
+          // field to what the directive asks for.
+          //
+          ocblock.setReplicationAndMark(directive.getReplication(), mark);
+        }
+      }
+    }
+    // Increment the "cached" statistics
+    directive.addBytesCached(cachedTotal);
+    if (cachedTotal == neededTotal) {
+      directive.addFilesCached(1);
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Directive " + directive.getId() + " is caching " +
+          file.getFullPathName() + ": " + cachedTotal + "/" + neededTotal +
+          " bytes");
+    }
+  }
+
+  private String findReasonForNotCaching(CachedBlock cblock, 
+          BlockInfo blockInfo) {
+    if (blockInfo == null) {
+      // Somehow, a cache report with the block arrived, but the block
+      // reports from the DataNode haven't (yet?) described such a block.
+      // Alternately, the NameNode might have invalidated the block, but the
+      // DataNode hasn't caught up.  In any case, we want to tell the DN
+      // to uncache this.
+      return "not tracked by the BlockManager";
+    } else if (!blockInfo.isComplete()) {
+      // When a cached block changes state from complete to some other state
+      // on the DataNode (perhaps because of append), it will begin the
+      // uncaching process.  However, the uncaching process is not
+      // instantaneous, especially if clients have pinned the block.  So
+      // there may be a period of time when incomplete blocks remain cached
+      // on the DataNodes.
+      return "not complete";
+    } else if (cblock.getReplication() == 0) {
+      // Since 0 is not a valid value for a cache directive's replication
+      // field, seeing a replication of 0 on a CacheBlock means that it
+      // has never been reached by any sweep.
+      return "not needed by any directives";
+    } else if (cblock.getMark() != mark) { 
+      // Although the block was needed in the past, we didn't reach it during
+      // the current sweep.  Therefore, it doesn't need to be cached any more.
+      // Need to set the replication to 0 so it doesn't flip back to cached
+      // when the mark flips on the next scan
+      cblock.setReplicationAndMark((short)0, mark);
+      return "no longer needed by any directives";
+    }
+    return null;
+  }
+
+  /**
+   * Scan through the cached block map.
+   * Any blocks which are under-replicated should be assigned new Datanodes.
+   * Blocks that are over-replicated should be removed from Datanodes.
+   */
+  private void rescanCachedBlockMap() {
+    for (Iterator<CachedBlock> cbIter = cachedBlocks.iterator();
+        cbIter.hasNext(); ) {
+      scannedBlocks++;
+      CachedBlock cblock = cbIter.next();
+      List<DatanodeDescriptor> pendingCached =
+          cblock.getDatanodes(Type.PENDING_CACHED);
+      List<DatanodeDescriptor> cached =
+          cblock.getDatanodes(Type.CACHED);
+      List<DatanodeDescriptor> pendingUncached =
+          cblock.getDatanodes(Type.PENDING_UNCACHED);
+      // Remove nodes from PENDING_UNCACHED if they were actually uncached.
+      for (Iterator<DatanodeDescriptor> iter = pendingUncached.iterator();
+          iter.hasNext(); ) {
+        DatanodeDescriptor datanode = iter.next();
+        if (!cblock.isInList(datanode.getCached())) {
+          datanode.getPendingUncached().remove(cblock);
+          iter.remove();
+        }
+      }
+      BlockInfo blockInfo = blockManager.
+            getStoredBlock(new Block(cblock.getBlockId()));
+      String reason = findReasonForNotCaching(cblock, blockInfo);
+      int neededCached = 0;
+      if (reason != null) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("not caching " + cblock + " because it is " + reason);
+        }
+      } else {
+        neededCached = cblock.getReplication();
+      }
+      int numCached = cached.size();
+      if (numCached >= neededCached) {
+        // If we have enough replicas, drop all pending cached.
+        for (Iterator<DatanodeDescriptor> iter = pendingCached.iterator();
+            iter.hasNext(); ) {
+          DatanodeDescriptor datanode = iter.next();
+          datanode.getPendingCached().remove(cblock);
+          iter.remove();
+        }
+      }
+      if (numCached < neededCached) {
+        // If we don't have enough replicas, drop all pending uncached.
+        for (Iterator<DatanodeDescriptor> iter = pendingUncached.iterator();
+            iter.hasNext(); ) {
+          DatanodeDescriptor datanode = iter.next();
+          datanode.getPendingUncached().remove(cblock);
+          iter.remove();
+        }
+      }
+      int neededUncached = numCached -
+          (pendingUncached.size() + neededCached);
+      if (neededUncached > 0) {
+        addNewPendingUncached(neededUncached, cblock, cached,
+            pendingUncached);
+      } else {
+        int additionalCachedNeeded = neededCached -
+            (numCached + pendingCached.size());
+        if (additionalCachedNeeded > 0) {
+          addNewPendingCached(additionalCachedNeeded, cblock, cached,
+              pendingCached);
+        }
+      }
+      if ((neededCached == 0) &&
+          pendingUncached.isEmpty() &&
+          pendingCached.isEmpty()) {
+        // we have nothing more to do with this block.
+        cbIter.remove();
+      }
+    }
+  }
+
+  /**
+   * Add new entries to the PendingUncached list.
+   *
+   * @param neededUncached   The number of replicas that need to be uncached.
+   * @param cachedBlock      The block which needs to be uncached.
+   * @param cached           A list of DataNodes currently caching the block.
+   * @param pendingUncached  A list of DataNodes that will soon uncache the
+   *                         block.
+   */
+  private void addNewPendingUncached(int neededUncached,
+      CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
+      List<DatanodeDescriptor> pendingUncached) {
+    // Figure out which replicas can be uncached.
+    LinkedList<DatanodeDescriptor> possibilities =
+        new LinkedList<DatanodeDescriptor>();
+    for (DatanodeDescriptor datanode : cached) {
+      if (!pendingUncached.contains(datanode)) {
+        possibilities.add(datanode);
+      }
+    }
+    while (neededUncached > 0) {
+      if (possibilities.isEmpty()) {
+        LOG.warn("Logic error: we're trying to uncache more replicas than " +
+            "actually exist for " + cachedBlock);
+        return;
+      }
+      DatanodeDescriptor datanode =
+        possibilities.remove(random.nextInt(possibilities.size()));
+      pendingUncached.add(datanode);
+      boolean added = datanode.getPendingUncached().add(cachedBlock);
+      assert added;
+      neededUncached--;
+    }
+  }
+  
+  /**
+   * Add new entries to the PendingCached list.
+   *
+   * @param neededCached     The number of replicas that need to be cached.
+   * @param cachedBlock      The block which needs to be cached.
+   * @param cached           A list of DataNodes currently caching the block.
+   * @param pendingCached    A list of DataNodes that will soon cache the
+   *                         block.
+   */
+  private void addNewPendingCached(final int neededCached,
+      CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
+      List<DatanodeDescriptor> pendingCached) {
+    // To figure out which replicas can be cached, we consult the
+    // blocksMap.  We don't want to try to cache a corrupt replica, though.
+    BlockInfo blockInfo = blockManager.
+          getStoredBlock(new Block(cachedBlock.getBlockId()));
+    if (blockInfo == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not caching block " + cachedBlock + " because there " +
+            "is no record of it on the NameNode.");
+      }
+      return;
+    }
+    if (!blockInfo.isComplete()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Not caching block " + cachedBlock + " because it " +
+            "is not yet complete.");
+      }
+      return;
+    }
+    // Filter the list of replicas to only the valid targets
+    List<DatanodeDescriptor> possibilities =
+        new LinkedList<DatanodeDescriptor>();
+    int numReplicas = blockInfo.getCapacity();
+    Collection<DatanodeDescriptor> corrupt =
+        blockManager.getCorruptReplicas(blockInfo);
+    int outOfCapacity = 0;
+    for (int i = 0; i < numReplicas; i++) {
+      DatanodeDescriptor datanode = blockInfo.getDatanode(i);
+      if (datanode == null) {
+        continue;
+      }
+      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
+        continue;
+      }
+      if (corrupt != null && corrupt.contains(datanode)) {
+        continue;
+      }
+      if (pendingCached.contains(datanode) || cached.contains(datanode)) {
+        continue;
+      }
+      long pendingCapacity = datanode.getCacheRemaining();
+      // Subtract pending cached blocks from effective capacity
+      Iterator<CachedBlock> it = datanode.getPendingCached().iterator();
+      while (it.hasNext()) {
+        CachedBlock cBlock = it.next();
+        BlockInfo info =
+            blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
+        if (info != null) {
+          pendingCapacity -= info.getNumBytes();
+        }
+      }
+      it = datanode.getPendingUncached().iterator();
+      // Add pending uncached blocks from effective capacity
+      while (it.hasNext()) {
+        CachedBlock cBlock = it.next();
+        BlockInfo info =
+            blockManager.getStoredBlock(new Block(cBlock.getBlockId()));
+        if (info != null) {
+          pendingCapacity += info.getNumBytes();
+        }
+      }
+      if (pendingCapacity < blockInfo.getNumBytes()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Datanode " + datanode + " is not a valid possibility for"
+              + " block " + blockInfo.getBlockId() + " of size "
+              + blockInfo.getNumBytes() + " bytes, only has "
+              + datanode.getCacheRemaining() + " bytes of cache remaining.");
+        }
+        outOfCapacity++;
+        continue;
+      }
+      possibilities.add(datanode);
+    }
+    List<DatanodeDescriptor> chosen = chooseDatanodesForCaching(possibilities,
+        neededCached, blockManager.getDatanodeManager().getStaleInterval());
+    for (DatanodeDescriptor datanode : chosen) {
+      pendingCached.add(datanode);
+      boolean added = datanode.getPendingCached().add(cachedBlock);
+      assert added;
+    }
+    // We were unable to satisfy the requested replication factor
+    if (neededCached > chosen.size()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Only have " +
+            (cachedBlock.getReplication() - neededCached + chosen.size()) +
+            " of " + cachedBlock.getReplication() + " cached replicas for " +
+            cachedBlock + " (" + outOfCapacity + " nodes have insufficient " +
+            "capacity).");
+      }
+    }
+  }
+
+  /**
+   * Chooses datanode locations for caching from a list of valid possibilities.
+   * Non-stale nodes are chosen before stale nodes.
+   * 
+   * @param possibilities List of candidate datanodes
+   * @param neededCached Number of replicas needed
+   * @param staleInterval Age of a stale datanode
+   * @return A list of chosen datanodes
+   */
+  private static List<DatanodeDescriptor> chooseDatanodesForCaching(
+      final List<DatanodeDescriptor> possibilities, final int neededCached,
+      final long staleInterval) {
+    // Make a copy that we can modify
+    List<DatanodeDescriptor> targets =
+        new ArrayList<DatanodeDescriptor>(possibilities);
+    // Selected targets
+    List<DatanodeDescriptor> chosen = new LinkedList<DatanodeDescriptor>();
+
+    // Filter out stale datanodes
+    List<DatanodeDescriptor> stale = new LinkedList<DatanodeDescriptor>();
+    Iterator<DatanodeDescriptor> it = targets.iterator();
+    while (it.hasNext()) {
+      DatanodeDescriptor d = it.next();
+      if (d.isStale(staleInterval)) {
+        it.remove();
+        stale.add(d);
+      }
+    }
+    // Select targets
+    while (chosen.size() < neededCached) {
+      // Try to use stale nodes if we're out of non-stale nodes, else we're done
+      if (targets.isEmpty()) {
+        if (!stale.isEmpty()) {
+          targets = stale;
+        } else {
+          break;
+        }
+      }
+      // Select a random target
+      DatanodeDescriptor target =
+          chooseRandomDatanodeByRemainingCapacity(targets);
+      chosen.add(target);
+      targets.remove(target);
+    }
+    return chosen;
+  }
+
+  /**
+   * Choose a single datanode from the provided list of possible
+   * targets, weighted by the percentage of free space remaining on the node.
+   * 
+   * @return The chosen datanode
+   */
+  private static DatanodeDescriptor chooseRandomDatanodeByRemainingCapacity(
+      final List<DatanodeDescriptor> targets) {
+    // Use a weighted probability to choose the target datanode
+    float total = 0;
+    for (DatanodeDescriptor d : targets) {
+      total += d.getCacheRemainingPercent();
+    }
+    // Give each datanode a portion of keyspace equal to its relative weight
+    // [0, w1) selects d1, [w1, w2) selects d2, etc.
+    TreeMap<Integer, DatanodeDescriptor> lottery =
+        new TreeMap<Integer, DatanodeDescriptor>();
+    int offset = 0;
+    for (DatanodeDescriptor d : targets) {
+      // Since we're using floats, be paranoid about negative values
+      int weight =
+          Math.max(1, (int)((d.getCacheRemainingPercent() / total) * 1000000));
+      offset += weight;
+      lottery.put(offset, d);
+    }
+    // Choose a number from [0, offset), which is the total amount of weight,
+    // to select the winner
+    DatanodeDescriptor winner =
+        lottery.higherEntry(random.nextInt(offset)).getValue();
+    return winner;
+  }
+}

Propchange: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Wed Jan 22 21:43:00 2014
@@ -17,7 +17,15 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -26,11 +34,15 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.util.IntrusiveCollection;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This class extends the DatanodeInfo class with ephemeral information (eg
  * health, capacity, what blocks are associated with the Datanode) that is
@@ -99,6 +111,71 @@ public class DatanodeDescriptor extends 
   private final Map<String, DatanodeStorageInfo> storageMap = 
       new HashMap<String, DatanodeStorageInfo>();
 
+  /**
+   * A list of CachedBlock objects on this datanode.
+   */
+  public static class CachedBlocksList extends IntrusiveCollection<CachedBlock> {
+    public enum Type {
+      PENDING_CACHED,
+      CACHED,
+      PENDING_UNCACHED
+    }
+
+    private final DatanodeDescriptor datanode;
+
+    private final Type type;
+
+    CachedBlocksList(DatanodeDescriptor datanode, Type type) {
+      this.datanode = datanode;
+      this.type = type;
+    }
+
+    public DatanodeDescriptor getDatanode() {
+      return datanode;
+    }
+
+    public Type getType() {
+      return type;
+    }
+  }
+
+  /**
+   * The blocks which we want to cache on this DataNode.
+   */
+  private final CachedBlocksList pendingCached = 
+      new CachedBlocksList(this, CachedBlocksList.Type.PENDING_CACHED);
+
+  /**
+   * The blocks which we know are cached on this datanode.
+   * This list is updated by periodic cache reports.
+   */
+  private final CachedBlocksList cached = 
+      new CachedBlocksList(this, CachedBlocksList.Type.CACHED);
+
+  /**
+   * The blocks which we want to uncache on this DataNode.
+   */
+  private final CachedBlocksList pendingUncached = 
+      new CachedBlocksList(this, CachedBlocksList.Type.PENDING_UNCACHED);
+
+  public CachedBlocksList getPendingCached() {
+    return pendingCached;
+  }
+
+  public CachedBlocksList getCached() {
+    return cached;
+  }
+
+  public CachedBlocksList getPendingUncached() {
+    return pendingUncached;
+  }
+
+  /**
+   * The time when the last batch of caching directives was sent, in
+   * monotonic milliseconds.
+   */
+  private long lastCachingDirectiveSentTimeMs;
+
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   public boolean isAlive = false;
@@ -144,7 +221,7 @@ public class DatanodeDescriptor extends 
    */
   public DatanodeDescriptor(DatanodeID nodeID) {
     super(nodeID);
-    updateHeartbeat(StorageReport.EMPTY_ARRAY, 0, 0);
+    updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
   }
 
   /**
@@ -155,7 +232,7 @@ public class DatanodeDescriptor extends 
   public DatanodeDescriptor(DatanodeID nodeID, 
                             String networkLocation) {
     super(nodeID, networkLocation);
-    updateHeartbeat(StorageReport.EMPTY_ARRAY, 0, 0);
+    updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
   }
 
   /**
@@ -236,6 +313,11 @@ public class DatanodeDescriptor extends 
     setXceiverCount(0);
     this.invalidateBlocks.clear();
     this.volumeFailures = 0;
+    // pendingCached, cached, and pendingUncached are protected by the
+    // FSN lock.
+    this.pendingCached.clear();
+    this.cached.clear();
+    this.pendingUncached.clear();
   }
   
   public void clearBlockQueues() {
@@ -244,6 +326,11 @@ public class DatanodeDescriptor extends 
       this.recoverBlocks.clear();
       this.replicateBlocks.clear();
     }
+    // pendingCached, cached, and pendingUncached are protected by the
+    // FSN lock.
+    this.pendingCached.clear();
+    this.cached.clear();
+    this.pendingUncached.clear();
   }
 
   public int numBlocks() {
@@ -257,13 +344,15 @@ public class DatanodeDescriptor extends 
   /**
    * Updates stats from datanode heartbeat.
    */
-  public void updateHeartbeat(StorageReport[] reports,
-      int xceiverCount, int volFailures) {
+  public void updateHeartbeat(StorageReport[] reports, long cacheCapacity,
+      long cacheUsed, int xceiverCount, int volFailures) {
     long totalCapacity = 0;
     long totalRemaining = 0;
     long totalBlockPoolUsed = 0;
     long totalDfsUsed = 0;
 
+    setCacheCapacity(cacheCapacity);
+    setCacheUsed(cacheUsed);
     setXceiverCount(xceiverCount);
     setLastUpdate(Time.now());    
     this.volumeFailures = volFailures;
@@ -331,7 +420,7 @@ public class DatanodeDescriptor extends 
   Iterator<BlockInfo> getBlockIterator(final String storageID) {
     return new BlockIterator(getStorageInfo(storageID));
   }
-  
+
   /**
    * Store block replication work.
    */
@@ -363,7 +452,7 @@ public class DatanodeDescriptor extends 
       }
     }
   }
-
+  
   /**
    * The number of work items that are pending to be replicated
    */
@@ -380,7 +469,7 @@ public class DatanodeDescriptor extends 
       return invalidateBlocks.size();
     }
   }
-  
+
   public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
     return replicateBlocks.poll(maxTransfers);
   }
@@ -575,5 +664,21 @@ public class DatanodeDescriptor extends 
       return storage;
     }
   }
+
+  /**
+   * @return   The time at which we last sent caching directives to this 
+   *           DataNode, in monotonic milliseconds.
+   */
+  public long getLastCachingDirectiveSentTimeMs() {
+    return this.lastCachingDirectiveSentTimeMs;
+  }
+
+  /**
+   * @param time  The time at which we last sent caching directives to this 
+   *              DataNode, in monotonic milliseconds.
+   */
+  public void setLastCachingDirectiveSentTimeMs(long time) {
+    this.lastCachingDirectiveSentTimeMs = time;
+  }
 }
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Wed Jan 22 21:43:00 2014
@@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
 import org.apache.hadoop.hdfs.server.namenode.HostFileManager.Entry;
 import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;
@@ -144,6 +146,12 @@ public class DatanodeManager {
 
   private final boolean checkIpHostnameInRegistration;
   /**
+   * Whether we should tell datanodes what to cache in replies to
+   * heartbeat messages.
+   */
+  private boolean shouldSendCachingCommands = false;
+
+  /**
    * The number of datanodes for each software version. This list should change
    * during rolling upgrades.
    * Software version -> Number of datanodes with this version
@@ -151,6 +159,16 @@ public class DatanodeManager {
   private HashMap<String, Integer> datanodesSoftwareVersions =
     new HashMap<String, Integer>(4, 0.75f);
   
+  /**
+   * The minimum time between resending caching directives to Datanodes,
+   * in milliseconds.
+   *
+   * Note that when a rescan happens, we will send the new directives
+   * as soon as possible.  This timeout only applies to resending 
+   * directives that we've already sent.
+   */
+  private final long timeBetweenResendingCachingDirectivesMs;
+  
   DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
       final Configuration conf) throws IOException {
     this.namesystem = namesystem;
@@ -233,6 +251,9 @@ public class DatanodeManager {
         DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
         " = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
         "It should be a positive non-zero float value, not greater than 1.0f.");
+    this.timeBetweenResendingCachingDirectivesMs = conf.getLong(
+        DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
+        DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT);
   }
 
   private static long getStaleIntervalFromConf(Configuration conf,
@@ -1203,7 +1224,8 @@ public class DatanodeManager {
   /** Handle heartbeat from datanodes. */
   public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
       StorageReport[] reports, final String blockPoolId,
-      int xceiverCount, int maxTransfers, int failedVolumes
+      long cacheCapacity, long cacheUsed, int xceiverCount, 
+      int maxTransfers, int failedVolumes
       ) throws IOException {
     synchronized (heartbeatManager) {
       synchronized (datanodeMap) {
@@ -1225,6 +1247,7 @@ public class DatanodeManager {
         }
 
         heartbeatManager.updateHeartbeat(nodeinfo, reports,
+                                         cacheCapacity, cacheUsed,
                                          xceiverCount, failedVolumes);
 
         // If we are in safemode, do not send back any recovery / replication
@@ -1286,7 +1309,30 @@ public class DatanodeManager {
           cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
               blockPoolId, blks));
         }
-        
+        boolean sendingCachingCommands = false;
+        long nowMs = Time.monotonicNow();
+        if (shouldSendCachingCommands && 
+            ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
+                timeBetweenResendingCachingDirectivesMs)) {
+          DatanodeCommand pendingCacheCommand =
+              getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
+                DatanodeProtocol.DNA_CACHE, blockPoolId);
+          if (pendingCacheCommand != null) {
+            cmds.add(pendingCacheCommand);
+            sendingCachingCommands = true;
+          }
+          DatanodeCommand pendingUncacheCommand =
+              getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
+                DatanodeProtocol.DNA_UNCACHE, blockPoolId);
+          if (pendingUncacheCommand != null) {
+            cmds.add(pendingUncacheCommand);
+            sendingCachingCommands = true;
+          }
+          if (sendingCachingCommands) {
+            nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
+          }
+        }
+
         blockManager.addKeyUpdateCommand(cmds, nodeinfo);
 
         // check for balancer bandwidth update
@@ -1306,6 +1352,34 @@ public class DatanodeManager {
   }
 
   /**
+   * Convert a CachedBlockList into a DatanodeCommand with a list of blocks.
+   *
+   * @param list       The {@link CachedBlocksList}.  This function 
+   *                   clears the list.
+   * @param datanode   The datanode.
+   * @param action     The action to perform in the command.
+   * @param poolId     The block pool id.
+   * @return           A DatanodeCommand to be sent back to the DN, or null if
+   *                   there is nothing to be done.
+   */
+  private DatanodeCommand getCacheCommand(CachedBlocksList list,
+      DatanodeDescriptor datanode, int action, String poolId) {
+    int length = list.size();
+    if (length == 0) {
+      return null;
+    }
+    // Read the existing cache commands.
+    long[] blockIds = new long[length];
+    int i = 0;
+    for (Iterator<CachedBlock> iter = list.iterator();
+            iter.hasNext(); ) {
+      CachedBlock cachedBlock = iter.next();
+      blockIds[i++] = cachedBlock.getBlockId();
+    }
+    return new BlockIdCommand(action, poolId, blockIds);
+  }
+
+  /**
    * Tell all datanodes to use a new, non-persistent bandwidth value for
    * dfs.balance.bandwidthPerSec.
    *
@@ -1351,9 +1425,32 @@ public class DatanodeManager {
     }
   }
 
+  /**
+   * Reset the lastCachingDirectiveSentTimeMs field of all the DataNodes we
+   * know about.
+   */
+  public void resetLastCachingDirectiveSentTime() {
+    synchronized (datanodeMap) {
+      for (DatanodeDescriptor dn : datanodeMap.values()) {
+        dn.setLastCachingDirectiveSentTimeMs(0L);
+      }
+    }
+  }
+
   @Override
   public String toString() {
     return getClass().getSimpleName() + ": " + host2DatanodeMap;
   }
+
+  public void clearPendingCachingCommands() {
+    for (DatanodeDescriptor dn : datanodeMap.values()) {
+      dn.getPendingCached().clear();
+      dn.getPendingUncached().clear();
+    }
+  }
+
+  public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
+    this.shouldSendCachingCommands = shouldSendCachingCommands;
+  }
 }
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Wed Jan 22 21:43:00 2014
@@ -42,6 +42,12 @@ public interface DatanodeStatistics {
 
   /** @return the percentage of the block pool used space over the total capacity. */
   public float getPercentBlockPoolUsed();
+  
+  /** @return the total cache capacity of all DataNodes */
+  public long getCacheCapacity();
+
+  /** @return the total cache used by all DataNodes */
+  public long getCacheUsed();
 
   /** @return the xceiver count */
   public int getXceiverCount();

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Wed Jan 22 21:43:00 2014
@@ -149,6 +149,17 @@ class HeartbeatManager implements Datano
   public synchronized int getXceiverCount() {
     return stats.xceiverCount;
   }
+  
+  @Override
+  public synchronized long getCacheCapacity() {
+    return stats.cacheCapacity;
+  }
+
+  @Override
+  public synchronized long getCacheUsed() {
+    return stats.cacheUsed;
+  }
+  
 
   @Override
   public synchronized long[] getStats() {
@@ -171,7 +182,7 @@ class HeartbeatManager implements Datano
       addDatanode(d);
 
       //update its timestamp
-      d.updateHeartbeat(StorageReport.EMPTY_ARRAY, 0, 0);
+      d.updateHeartbeat(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0);
     }
   }
 
@@ -193,9 +204,11 @@ class HeartbeatManager implements Datano
   }
 
   synchronized void updateHeartbeat(final DatanodeDescriptor node,
-      StorageReport[] reports, int xceiverCount, int failedVolumes) {
+      StorageReport[] reports, long cacheCapacity, long cacheUsed,
+      int xceiverCount, int failedVolumes) {
     stats.subtract(node);
-    node.updateHeartbeat(reports, xceiverCount, failedVolumes);
+    node.updateHeartbeat(reports, cacheCapacity, cacheUsed,
+        xceiverCount, failedVolumes);
     stats.add(node);
   }
 
@@ -307,6 +320,8 @@ class HeartbeatManager implements Datano
     private long capacityRemaining = 0L;
     private long blockPoolUsed = 0L;
     private int xceiverCount = 0;
+    private long cacheCapacity = 0L;
+    private long cacheUsed = 0L;
 
     private int expiredHeartbeats = 0;
 
@@ -320,6 +335,8 @@ class HeartbeatManager implements Datano
       } else {
         capacityTotal += node.getDfsUsed();
       }
+      cacheCapacity += node.getCacheCapacity();
+      cacheUsed += node.getCacheUsed();
     }
 
     private void subtract(final DatanodeDescriptor node) {
@@ -332,6 +349,8 @@ class HeartbeatManager implements Datano
       } else {
         capacityTotal -= node.getDfsUsed();
       }
+      cacheCapacity -= node.getCacheCapacity();
+      cacheUsed -= node.getCacheUsed();
     }
     
     /** Increment expired heartbeat counter. */

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Wed Jan 22 21:43:00 2014
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -538,6 +539,21 @@ class BPOfferService {
   }
   }
 
+  private String blockIdArrayToString(long ids[]) {
+    long maxNumberOfBlocksToLog = dn.getMaxNumberOfBlocksToLog();
+    StringBuilder bld = new StringBuilder();
+    String prefix = "";
+    for (int i = 0; i < ids.length; i++) {
+      if (i >= maxNumberOfBlocksToLog) {
+        bld.append("...");
+        break;
+      }
+      bld.append(prefix).append(ids[i]);
+      prefix = ", ";
+    }
+    return bld.toString();
+  }
+
   /**
    * This method should handle all commands from Active namenode except
    * DNA_REGISTER which should be handled earlier itself.
@@ -550,6 +566,8 @@ class BPOfferService {
       BPServiceActor actor) throws IOException {
     final BlockCommand bcmd = 
       cmd instanceof BlockCommand? (BlockCommand)cmd: null;
+    final BlockIdCommand blockIdCmd = 
+      cmd instanceof BlockIdCommand ? (BlockIdCommand)cmd: null;
 
     switch(cmd.getAction()) {
     case DatanodeProtocol.DNA_TRANSFER:
@@ -575,6 +593,20 @@ class BPOfferService {
       }
       dn.metrics.incrBlocksRemoved(toDelete.length);
       break;
+    case DatanodeProtocol.DNA_CACHE:
+      LOG.info("DatanodeCommand action: DNA_CACHE for " +
+        blockIdCmd.getBlockPoolId() + " of [" +
+          blockIdArrayToString(blockIdCmd.getBlockIds()) + "]");
+      dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
+      dn.metrics.incrBlocksCached(blockIdCmd.getBlockIds().length);
+      break;
+    case DatanodeProtocol.DNA_UNCACHE:
+      LOG.info("DatanodeCommand action: DNA_UNCACHE for " +
+        blockIdCmd.getBlockPoolId() + " of [" +
+          blockIdArrayToString(blockIdCmd.getBlockIds()) + "]");
+      dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
+      dn.metrics.incrBlocksUncached(blockIdCmd.getBlockIds().length);
+      break;
     case DatanodeProtocol.DNA_SHUTDOWN:
       // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command
       // See HDFS-2987.
@@ -639,6 +671,8 @@ class BPOfferService {
     case DatanodeProtocol.DNA_FINALIZE:
     case DatanodeProtocol.DNA_RECOVERBLOCK:
     case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
+    case DatanodeProtocol.DNA_CACHE:
+    case DatanodeProtocol.DNA_UNCACHE:
       LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
       break;
     default:

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Wed Jan 22 21:43:00 2014
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -86,6 +87,8 @@ class BPServiceActor implements Runnable
 
   boolean resetBlockReportTime = true;
 
+  volatile long lastCacheReport = 0;
+
   Thread bpThread;
   DatanodeProtocolClientSideTranslatorPB bpNamenode;
   private volatile long lastHeartbeat = 0;
@@ -503,6 +506,35 @@ class BPServiceActor implements Runnable
     return cmd;
   }
   
+  DatanodeCommand cacheReport() throws IOException {
+    // If caching is disabled, do not send a cache report
+    if (dn.getFSDataset().getCacheCapacity() == 0) {
+      return null;
+    }
+    // send cache report if timer has expired.
+    DatanodeCommand cmd = null;
+    long startTime = Time.monotonicNow();
+    if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Sending cacheReport from service actor: " + this);
+      }
+      lastCacheReport = startTime;
+
+      String bpid = bpos.getBlockPoolId();
+      List<Long> blockIds = dn.getFSDataset().getCacheReport(bpid);
+      long createTime = Time.monotonicNow();
+
+      cmd = bpNamenode.cacheReport(bpRegistration, bpid, blockIds);
+      long sendTime = Time.monotonicNow();
+      long createCost = createTime - startTime;
+      long sendCost = sendTime - createTime;
+      dn.getMetrics().addCacheReport(sendCost);
+      LOG.debug("CacheReport of " + blockIds.size()
+          + " block(s) took " + createCost + " msec to generate and "
+          + sendCost + " msecs for RPC and NN processing");
+    }
+    return cmd;
+  }
   
   HeartbeatResponse sendHeartBeat() throws IOException {
     StorageReport[] reports =
@@ -514,6 +546,8 @@ class BPServiceActor implements Runnable
 
     return bpNamenode.sendHeartbeat(bpRegistration,
         reports,
+        dn.getFSDataset().getCacheCapacity(),
+        dn.getFSDataset().getCacheUsed(),
         dn.getXmitsInProgress(),
         dn.getXceiverCount(),
         dn.getFSDataset().getNumFailedVolumes());
@@ -567,11 +601,12 @@ class BPServiceActor implements Runnable
    * forever calling remote NameNode functions.
    */
   private void offerService() throws Exception {
-    LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
-        + dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
-        + dnConf.blockReportInterval + "msec" + " Initial delay: "
-        + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
-        + dnConf.heartBeatInterval);
+    LOG.info("For namenode " + nnAddr + " using"
+        + " DELETEREPORT_INTERVAL of " + dnConf.deleteReportInterval + " msec "
+        + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
+        + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
+        + " Initial delay: " + dnConf.initialBlockReportDelay + "msec"
+        + "; heartBeatInterval=" + dnConf.heartBeatInterval);
 
     //
     // Now loop for a long time....
@@ -627,6 +662,9 @@ class BPServiceActor implements Runnable
         DatanodeCommand cmd = blockReport();
         processCommand(new DatanodeCommand[]{ cmd });
 
+        cmd = cacheReport();
+        processCommand(new DatanodeCommand[]{ cmd });
+
         // Now safe to start scanning the block pool.
         // If it has already been started, this is a no-op.
         if (dn.blockScanner != null) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Jan 22 21:43:00 2014
@@ -664,8 +664,9 @@ class BlockReceiver implements Closeable
         //                     
         long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
         if (dropPos > 0 && dropCacheBehindWrites) {
-          NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
-              outFd, 0, dropPos, NativeIO.POSIX.POSIX_FADV_DONTNEED);
+          NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+              block.getBlockName(), outFd, 0, dropPos,
+              NativeIO.POSIX.POSIX_FADV_DONTNEED);
         }
         lastCacheManagementOffset = offsetInBlock;
       }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Wed Jan 22 21:43:00 2014
@@ -375,8 +375,9 @@ class BlockSender implements java.io.Clo
         ((dropCacheBehindAllReads) ||
          (dropCacheBehindLargeReads && isLongRead()))) {
       try {
-        NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
-            blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
+        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+            block.getBlockName(), blockInFd, lastCacheDropOffset,
+            offset - lastCacheDropOffset,
             NativeIO.POSIX.POSIX_FADV_DONTNEED);
       } catch (Exception e) {
         LOG.warn("Unable to drop cache on file close", e);
@@ -674,8 +675,9 @@ class BlockSender implements java.io.Clo
 
     if (isLongRead() && blockInFd != null) {
       // Advise that this file descriptor will be accessed sequentially.
-      NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
-          blockInFd, 0, 0, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
+      NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+          block.getBlockName(), blockInFd, 0, 0,
+          NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
     }
     
     // Trigger readahead of beginning of file if configured.
@@ -761,9 +763,9 @@ class BlockSender implements java.io.Clo
       long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
       if (offset >= nextCacheDropOffset) {
         long dropLength = offset - lastCacheDropOffset;
-        NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
-            blockInFd, lastCacheDropOffset, dropLength,
-            NativeIO.POSIX.POSIX_FADV_DONTNEED);
+        NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
+            block.getBlockName(), blockInFd, lastCacheDropOffset,
+            dropLength, NativeIO.POSIX.POSIX_FADV_DONTNEED);
         lastCacheDropOffset = offset;
       }
     }

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1560522&r1=1560521&r2=1560522&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Wed Jan 22 21:43:00 2014
@@ -18,13 +18,18 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
@@ -39,6 +44,7 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -66,6 +72,7 @@ public class DNConf {
   final long blockReportInterval;
   final long deleteReportInterval;
   final long initialBlockReportDelay;
+  final long cacheReportInterval;
   final int writePacketSize;
   
   final String minimumNameNodeVersion;
@@ -73,6 +80,8 @@ public class DNConf {
   
   final long xceiverStopTimeout;
 
+  final long maxLockedMemory;
+
   public DNConf(Configuration conf) {
     socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
         HdfsServerConstants.READ_TIMEOUT);
@@ -107,7 +116,9 @@ public class DNConf {
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
     this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
-    DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+        DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+    this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
+        DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT);
     
     long initBRDelay = conf.getLong(
         DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
@@ -137,6 +148,10 @@ public class DNConf {
     this.xceiverStopTimeout = conf.getLong(
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
+
+    this.maxLockedMemory = conf.getLong(
+        DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+        DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
   }
   
   // We get minimumNameNodeVersion via a method so it can be mocked out in tests.
@@ -147,4 +162,8 @@ public class DNConf {
   public long getXceiverStopTimeout() {
     return xceiverStopTimeout;
   }
+
+  public long getMaxLockedMemory() {
+    return maxLockedMemory;
+  }
 }



Mime
View raw message