hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1555021 [7/15] - in /hadoop/common/branches/HDFS-5535/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ hadoop-hdfs/dev-support/ hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apache...
Date Fri, 03 Jan 2014 07:27:01 GMT
Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Fri Jan  3 07:26:52 2014
@@ -38,13 +38,14 @@ import javax.management.NotCompliantMBea
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.server.common.GenerationStamp;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -66,6 +68,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
@@ -76,7 +79,9 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.util.DataChecksum;
@@ -108,6 +113,26 @@ class FsDatasetImpl implements FsDataset
     return volumes.volumes;
   }
 
+  @Override // FsDatasetSpi
+  public StorageReport[] getStorageReports(String bpid)
+      throws IOException {
+    StorageReport[] reports;
+    synchronized (statsLock) {
+      reports = new StorageReport[volumes.volumes.size()];
+      int i = 0;
+      for (FsVolumeImpl volume : volumes.volumes) {
+        reports[i++] = new StorageReport(volume.getStorageID(),
+                                         false,
+                                         volume.getCapacity(),
+                                         volume.getDfsUsed(),
+                                         volume.getAvailable(),
+                                         volume.getBlockPoolUsed(bpid));
+      }
+    }
+
+    return reports;
+  }
+
   @Override
   public synchronized FsVolumeImpl getVolume(final ExtendedBlock b) {
     final ReplicaInfo r =  volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
@@ -169,11 +194,12 @@ class FsDatasetImpl implements FsDataset
     
   final DataNode datanode;
   final FsVolumeList volumes;
-  final ReplicaMap volumeMap;
   final FsDatasetAsyncDiskService asyncDiskService;
   final FsDatasetCache cacheManager;
   private final int validVolsRequired;
 
+  final ReplicaMap volumeMap;
+
   // Used for synchronizing access to usage stats
   private final Object statsLock = new Object();
 
@@ -190,6 +216,7 @@ class FsDatasetImpl implements FsDataset
                   DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT);
 
     String[] dataDirs = conf.getTrimmedStrings(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY);
+    Collection<StorageLocation> dataLocations = DataNode.getStorageLocations(conf);
 
     int volsConfigured = (dataDirs == null) ? 0 : dataDirs.length;
     int volsFailed = volsConfigured - storage.getNumStorageDirs();
@@ -210,9 +237,15 @@ class FsDatasetImpl implements FsDataset
     final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
         storage.getNumStorageDirs());
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
-      final File dir = storage.getStorageDir(idx).getCurrentDir();
-      volArray.add(new FsVolumeImpl(this, storage.getStorageID(), dir, conf));
-      LOG.info("Added volume - " + dir);
+      // TODO: getStorageTypeFromLocations() is only a temporary workaround and 
+      // should be replaced with getting storage type from DataStorage (missing 
+      // storage type now) directly.
+      Storage.StorageDirectory sd = storage.getStorageDir(idx);
+      final File dir = sd.getCurrentDir();
+      final StorageType storageType = getStorageTypeFromLocations(dataLocations, dir);
+      volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf,
+          storageType));
+      LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
     }
     volumeMap = new ReplicaMap(this);
 
@@ -223,7 +256,7 @@ class FsDatasetImpl implements FsDataset
             RoundRobinVolumeChoosingPolicy.class,
             VolumeChoosingPolicy.class), conf);
     volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl);
-    volumes.getVolumeMap(volumeMap);
+    volumes.initializeReplicaMaps(volumeMap);
 
     File[] roots = new File[storage.getNumStorageDirs()];
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
@@ -231,7 +264,17 @@ class FsDatasetImpl implements FsDataset
     }
     asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots);
     cacheManager = new FsDatasetCache(this);
-    registerMBean(storage.getStorageID());
+    registerMBean(datanode.getDatanodeUuid());
+  }
+
+  private StorageType getStorageTypeFromLocations(
+      Collection<StorageLocation> dataLocations, File dir) {
+    for (StorageLocation dataLocation : dataLocations) {
+      if (dataLocation.getFile().equals(dir)) {
+        return dataLocation.getStorageType();
+      }
+    }
+    return StorageType.DEFAULT;
   }
 
   /**
@@ -336,9 +379,6 @@ class FsDatasetImpl implements FsDataset
   File getBlockFile(String bpid, Block b) throws IOException {
     File f = validateBlockFile(bpid, b);
     if(f == null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
-      }
       throw new IOException("Block " + b + " is not valid.");
     }
     return f;
@@ -684,7 +724,7 @@ class FsDatasetImpl implements FsDataset
   }
 
   @Override // FsDatasetSpi
-  public void recoverClose(ExtendedBlock b, long newGS,
+  public String recoverClose(ExtendedBlock b, long newGS,
       long expectedBlockLen) throws IOException {
     LOG.info("Recover failed close " + b);
     // check replica's state
@@ -695,6 +735,7 @@ class FsDatasetImpl implements FsDataset
     if (replicaInfo.getState() == ReplicaState.RBW) {
       finalizeReplica(b.getBlockPoolId(), replicaInfo);
     }
+    return replicaInfo.getStorageUuid();
   }
   
   /**
@@ -995,56 +1036,68 @@ class FsDatasetImpl implements FsDataset
     return true;
   }
 
-  /**
-   * Generates a block report from the in-memory block map.
-   */
   @Override // FsDatasetSpi
-  public BlockListAsLongs getBlockReport(String bpid) {
-    int size =  volumeMap.size(bpid);
-    ArrayList<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(size);
-    ArrayList<ReplicaInfo> uc = new ArrayList<ReplicaInfo>();
-    if (size == 0) {
-      return new BlockListAsLongs(finalized, uc);
+  public List<Long> getCacheReport(String bpid) {
+    return cacheManager.getCachedBlocks(bpid);
+  }
+
+  @Override
+  public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
+    Map<DatanodeStorage, BlockListAsLongs> blockReportsMap =
+        new HashMap<DatanodeStorage, BlockListAsLongs>();
+
+    Map<String, ArrayList<ReplicaInfo>> finalized =
+        new HashMap<String, ArrayList<ReplicaInfo>>();
+    Map<String, ArrayList<ReplicaInfo>> uc =
+        new HashMap<String, ArrayList<ReplicaInfo>>();
+
+    for (FsVolumeSpi v : volumes.volumes) {
+      finalized.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
+      uc.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
     }
-    
+
     synchronized(this) {
       for (ReplicaInfo b : volumeMap.replicas(bpid)) {
         switch(b.getState()) {
-        case FINALIZED:
-          finalized.add(b);
-          break;
-        case RBW:
-        case RWR:
-          uc.add(b);
-          break;
-        case RUR:
-          ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
-          uc.add(rur.getOriginalReplica());
-          break;
-        case TEMPORARY:
-          break;
-        default:
-          assert false : "Illegal ReplicaInfo state.";
+          case FINALIZED:
+            finalized.get(b.getVolume().getStorageID()).add(b);
+            break;
+          case RBW:
+          case RWR:
+            uc.get(b.getVolume().getStorageID()).add(b);
+            break;
+          case RUR:
+            ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
+            uc.get(rur.getVolume().getStorageID()).add(rur.getOriginalReplica());
+            break;
+          case TEMPORARY:
+            break;
+          default:
+            assert false : "Illegal ReplicaInfo state.";
         }
       }
-      return new BlockListAsLongs(finalized, uc);
     }
-  }
 
-  @Override // FsDatasetSpi
-  public List<Long> getCacheReport(String bpid) {
-    return cacheManager.getCachedBlocks(bpid);
+    for (FsVolumeImpl v : volumes.volumes) {
+      ArrayList<ReplicaInfo> finalizedList = finalized.get(v.getStorageID());
+      ArrayList<ReplicaInfo> ucList = uc.get(v.getStorageID());
+      blockReportsMap.put(v.toDatanodeStorage(),
+                          new BlockListAsLongs(finalizedList, ucList));
+    }
+
+    return blockReportsMap;
   }
 
   /**
    * Get the list of finalized blocks from in-memory blockmap for a block pool.
    */
   @Override
-  public synchronized List<Block> getFinalizedBlocks(String bpid) {
-    ArrayList<Block> finalized = new ArrayList<Block>(volumeMap.size(bpid));
+  public synchronized List<FinalizedReplica> getFinalizedBlocks(String bpid) {
+    ArrayList<FinalizedReplica> finalized =
+        new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
     for (ReplicaInfo b : volumeMap.replicas(bpid)) {
       if(b.getState() == ReplicaState.FINALIZED) {
-        finalized.add(new Block(b));
+        finalized.add(new FinalizedReplica((FinalizedReplica)b));
       }
     }
     return finalized;
@@ -1333,22 +1386,15 @@ class FsDatasetImpl implements FsDataset
   
   /**
    * Register the FSDataset MBean using the name
-   *        "hadoop:service=DataNode,name=FSDatasetState-<storageid>"
+   *        "hadoop:service=DataNode,name=FSDatasetState-<datanodeUuid>"
    */
-  void registerMBean(final String storageId) {
+  void registerMBean(final String datanodeUuid) {
     // We wrap to bypass standard mbean naming convetion.
     // This wraping can be removed in java 6 as it is more flexible in 
     // package naming for mbeans and their impl.
-    StandardMBean bean;
-    String storageName;
-    if (storageId == null || storageId.equals("")) {// Temp fix for the uninitialized storage
-      storageName = "UndefinedStorageId" + DFSUtil.getRandom().nextInt();
-    } else {
-      storageName = storageId;
-    }
     try {
-      bean = new StandardMBean(this,FSDatasetMBean.class);
-      mbeanName = MBeans.register("DataNode", "FSDatasetState-" + storageName, bean);
+      StandardMBean bean = new StandardMBean(this,FSDatasetMBean.class);
+      mbeanName = MBeans.register("DataNode", "FSDatasetState-" + datanodeUuid, bean);
     } catch (NotCompliantMBeanException e) {
       LOG.warn("Error registering FSDatasetState MBean", e);
     }
@@ -1724,7 +1770,7 @@ class FsDatasetImpl implements FsDataset
     LOG.info("Adding block pool " + bpid);
     volumes.addBlockPool(bpid, conf);
     volumeMap.initBlockPool(bpid);
-    volumes.getVolumeMap(bpid, volumeMap);
+    volumes.getAllVolumesMap(bpid, volumeMap);
   }
 
   @Override
@@ -1734,11 +1780,6 @@ class FsDatasetImpl implements FsDataset
     volumes.removeBlockPool(bpid);
   }
   
-  @Override
-  public String[] getBlockPoolList() {
-    return volumeMap.getBlockPoolList();
-  }
-  
   /**
    * Class for representing the Datanode volume information
    */
@@ -1871,3 +1912,4 @@ class FsDatasetImpl implements FsDataset
     return new RollingLogsImpl(dir, prefix);
   }
 }
+

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java Fri Jan  3 07:26:52 2014
@@ -34,9 +34,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.DF;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.DataStorage;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -50,6 +52,7 @@ import com.google.common.util.concurrent
 class FsVolumeImpl implements FsVolumeSpi {
   private final FsDatasetImpl dataset;
   private final String storageID;
+  private final StorageType storageType;
   private final Map<String, BlockPoolSlice> bpSlices
       = new HashMap<String, BlockPoolSlice>();
   private final File currentDir;    // <StorageDirectory>/current
@@ -64,7 +67,7 @@ class FsVolumeImpl implements FsVolumeSp
   private final ThreadPoolExecutor cacheExecutor;
   
   FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
-      Configuration conf) throws IOException {
+      Configuration conf, StorageType storageType) throws IOException {
     this.dataset = dataset;
     this.storageID = storageID;
     this.reserved = conf.getLong(
@@ -73,6 +76,7 @@ class FsVolumeImpl implements FsVolumeSp
     this.currentDir = currentDir; 
     File parent = currentDir.getParentFile();
     this.usage = new DF(parent, conf);
+    this.storageType = storageType;
     final int maxNumThreads = dataset.datanode.getConf().getInt(
         DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
         DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT
@@ -320,7 +324,19 @@ class FsVolumeImpl implements FsVolumeSp
     }
   }
 
-  String getStorageID() {
+  @Override
+  public String getStorageID() {
     return storageID;
   }
+  
+  @Override
+  public StorageType getStorageType() {
+    return storageType;
+  }
+  
+  DatanodeStorage toDatanodeStorage() {
+    return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
+  }
+
 }
+

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Fri Jan  3 07:26:52 2014
@@ -18,10 +18,7 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
+import java.util.*;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -56,6 +53,7 @@ class FsVolumeList {
    * @param blockSize free space needed on the volume
    * @return next volume to store the block in.
    */
+  // TODO should choose volume with storage type
   synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
     return blockChooser.chooseVolume(volumes, blockSize);
   }
@@ -92,27 +90,32 @@ class FsVolumeList {
     return remaining;
   }
     
-  void getVolumeMap(ReplicaMap volumeMap) throws IOException {
+  void initializeReplicaMaps(ReplicaMap globalReplicaMap) throws IOException {
     for (FsVolumeImpl v : volumes) {
-      v.getVolumeMap(volumeMap);
+      v.getVolumeMap(globalReplicaMap);
     }
   }
   
-  void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
+  void getAllVolumesMap(String bpid, ReplicaMap volumeMap) throws IOException {
     long totalStartTime = System.currentTimeMillis();
     for (FsVolumeImpl v : volumes) {
-      FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid +
-          " on volume " + v + "...");
-      long startTime = System.currentTimeMillis();
-      v.getVolumeMap(bpid, volumeMap);
-      long timeTaken = System.currentTimeMillis() - startTime;
-      FsDatasetImpl.LOG.info("Time to add replicas to map for block pool " + bpid +
-          " on volume " + v + ": " + timeTaken + "ms");
+      getVolumeMap(bpid, v, volumeMap);
     }
     long totalTimeTaken = System.currentTimeMillis() - totalStartTime;
     FsDatasetImpl.LOG.info("Total time to add all replicas to map: "
         + totalTimeTaken + "ms");
   }
+
+  void getVolumeMap(String bpid, FsVolumeImpl volume, ReplicaMap volumeMap)
+      throws IOException {
+    FsDatasetImpl.LOG.info("Adding replicas to map for block pool " + bpid +
+                               " on volume " + volume + "...");
+    long startTime = System.currentTimeMillis();
+    volume.getVolumeMap(bpid, volumeMap);
+    long timeTaken = System.currentTimeMillis() - startTime;
+    FsDatasetImpl.LOG.info("Time to add replicas to map for block pool " + bpid +
+                               " on volume " + volume + ": " + timeTaken + "ms");
+  }
     
   /**
    * Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ReplicaMap.java Fri Jan  3 07:26:52 2014
@@ -117,6 +117,13 @@ class ReplicaMap {
       return  m.put(replicaInfo.getBlockId(), replicaInfo);
     }
   }
+
+  /**
+   * Add all entries from the given replica map into the local replica map.
+   */
+  void addAll(ReplicaMap other) {
+    map.putAll(other.map);
+  }
   
   /**
    * Remove the replica's meta information from the map that matches

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Fri Jan  3 07:26:52 2014
@@ -17,8 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
@@ -27,17 +27,20 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
 
 import java.io.DataInput;
-import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
+import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
@@ -45,18 +48,21 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.CacheDirective;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
@@ -68,7 +74,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.hdfs.util.ReadOnlyList;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.util.GSet;
 import org.apache.hadoop.util.LightWeightGSet;
@@ -79,7 +85,7 @@ import com.google.common.annotations.Vis
 /**
  * The Cache Manager handles caching on DataNodes.
  *
- * This class is instantiated by the FSNamesystem when caching is enabled.
+ * This class is instantiated by the FSNamesystem.
  * It maintains the mapping of cached blocks to datanodes via processing
  * datanode cache reports. Based on these reports and addition and removal of
  * caching directives, we will schedule caching and uncaching work.
@@ -88,6 +94,8 @@ import com.google.common.annotations.Vis
 public final class CacheManager {
   public static final Log LOG = LogFactory.getLog(CacheManager.class);
 
+  private static final float MIN_CACHED_BLOCKS_PERCENT = 0.001f;
+
   // TODO: add pending / underCached / schedule cached blocks stats.
 
   /**
@@ -143,32 +151,14 @@ public final class CacheManager {
   private final long scanIntervalMs;
 
   /**
-   * Whether caching is enabled.
-   *
-   * If caching is disabled, we will not process cache reports or store
-   * information about what is cached where.  We also do not start the
-   * CacheReplicationMonitor thread.  This will save resources, but provide
-   * less functionality.
-   *     
-   * Even when caching is disabled, we still store path-based cache
-   * information.  This information is stored in the edit log and fsimage.  We
-   * don't want to lose it just because a configuration setting was turned off.
-   * However, we will not act on this information if caching is disabled.
+   * All cached blocks.
    */
-  private final boolean enabled;
+  private final GSet<CachedBlock, CachedBlock> cachedBlocks;
 
   /**
-   * Whether the CacheManager is active.
-   * 
-   * When the CacheManager is active, it tells the DataNodes what to cache
-   * and uncache.  The CacheManager cannot become active if enabled = false.
+   * Lock which protects the CacheReplicationMonitor.
    */
-  private boolean active = false;
-
-  /**
-   * All cached blocks.
-   */
-  private final GSet<CachedBlock, CachedBlock> cachedBlocks;
+  private final ReentrantLock crmLock = new ReentrantLock();
 
   /**
    * The CacheReplicationMonitor.
@@ -189,54 +179,51 @@ public final class CacheManager {
     scanIntervalMs = conf.getLong(
         DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS,
         DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
-    this.enabled = conf.getBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY,
-        DFS_NAMENODE_CACHING_ENABLED_DEFAULT);
-    this.cachedBlocks = !enabled ? null :
-        new LightWeightGSet<CachedBlock, CachedBlock>(
-            LightWeightGSet.computeCapacity(0.25, "cachedBlocks"));
+    float cachedBlocksPercent = conf.getFloat(
+          DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT,
+          DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT);
+    if (cachedBlocksPercent < MIN_CACHED_BLOCKS_PERCENT) {
+      LOG.info("Using minimum value " + MIN_CACHED_BLOCKS_PERCENT +
+        " for " + DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT);
+      cachedBlocksPercent = MIN_CACHED_BLOCKS_PERCENT;
+    }
+    this.cachedBlocks = new LightWeightGSet<CachedBlock, CachedBlock>(
+          LightWeightGSet.computeCapacity(cachedBlocksPercent,
+              "cachedBlocks"));
+
   }
 
-  /**
-   * Activate the cache manager.
-   * 
-   * When the cache manager is active, tell the datanodes where to cache files.
-   */
-  public void activate() {
-    assert namesystem.hasWriteLock();
-    if (enabled && (!active)) {
-      LOG.info("Activating CacheManager.  " +
-          "Starting replication monitor thread...");
-      active = true;
-      monitor = new CacheReplicationMonitor(namesystem, this,
-         scanIntervalMs);
-      monitor.start();
+  public void startMonitorThread() {
+    crmLock.lock();
+    try {
+      if (this.monitor == null) {
+        this.monitor = new CacheReplicationMonitor(namesystem, this,
+            scanIntervalMs, crmLock);
+        this.monitor.start();
+      }
+    } finally {
+      crmLock.unlock();
     }
   }
 
-  /**
-   * Deactivate the cache manager.
-   * 
-   * When the cache manager is inactive, it does not tell the datanodes where to
-   * cache files.
-   */
-  public void deactivate() {
-    assert namesystem.hasWriteLock();
-    if (active) {
-      LOG.info("Deactivating CacheManager.  " +
-          "stopping CacheReplicationMonitor thread...");
-      active = false;
-      IOUtils.closeQuietly(monitor);
-      monitor = null;
-      LOG.info("CacheReplicationMonitor thread stopped and deactivated.");
+  public void stopMonitorThread() {
+    crmLock.lock();
+    try {
+      if (this.monitor != null) {
+        CacheReplicationMonitor prevMonitor = this.monitor;
+        this.monitor = null;
+        IOUtils.closeQuietly(prevMonitor);
+      }
+    } finally {
+      crmLock.unlock();
     }
   }
 
-  /**
-   * Return true only if the cache manager is active.
-   * Must be called under the FSN read or write lock.
-   */
-  public boolean isActive() {
-    return active;
+  public void clearDirectiveStats() {
+    assert namesystem.hasWriteLock();
+    for (CacheDirective directive : directivesById.values()) {
+      directive.resetStatistics();
+    }
   }
 
   /**
@@ -318,27 +305,112 @@ public final class CacheManager {
    * {@link CacheDirectiveInfo.Expiration}. This converts a relative Expiration
    * into an absolute time based on the local clock.
    * 
-   * @param directive from which to get the expiry time
-   * @param defaultValue to use if Expiration is not set
-   * @return Absolute expiry time in milliseconds since Unix epoch
-   * @throws InvalidRequestException if the Expiration is invalid
-   */
-  private static long validateExpiryTime(CacheDirectiveInfo directive,
-      long defaultValue) throws InvalidRequestException {
-    long expiryTime;
-    CacheDirectiveInfo.Expiration expiration = directive.getExpiration();
-    if (expiration != null) {
-      if (expiration.getMillis() < 0) {
-        throw new InvalidRequestException("Cannot set a negative expiration: "
-            + expiration.getMillis());
-      }
-      // Converts a relative duration into an absolute time based on the local
-      // clock
-      expiryTime = expiration.getAbsoluteMillis();
+   * @param info to validate.
+   * @param maxRelativeExpiryTime of the info's pool.
+   * @return the expiration time, or the pool's max absolute expiration if the
+   *         info's expiration was not set.
+   * @throws InvalidRequestException if the info's Expiration is invalid.
+   */
+  private static long validateExpiryTime(CacheDirectiveInfo info,
+      long maxRelativeExpiryTime) throws InvalidRequestException {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Validating directive " + info
+          + " pool maxRelativeExpiryTime " + maxRelativeExpiryTime);
+    }
+    final long now = new Date().getTime();
+    final long maxAbsoluteExpiryTime = now + maxRelativeExpiryTime;
+    if (info == null || info.getExpiration() == null) {
+      return maxAbsoluteExpiryTime;
+    }
+    Expiration expiry = info.getExpiration();
+    if (expiry.getMillis() < 0l) {
+      throw new InvalidRequestException("Cannot set a negative expiration: "
+          + expiry.getMillis());
+    }
+    long relExpiryTime, absExpiryTime;
+    if (expiry.isRelative()) {
+      relExpiryTime = expiry.getMillis();
+      absExpiryTime = now + relExpiryTime;
     } else {
-      expiryTime = defaultValue;
+      absExpiryTime = expiry.getMillis();
+      relExpiryTime = absExpiryTime - now;
+    }
+    // Need to cap the expiry so we don't overflow a long when doing math
+    if (relExpiryTime > Expiration.MAX_RELATIVE_EXPIRY_MS) {
+      throw new InvalidRequestException("Expiration "
+          + expiry.toString() + " is too far in the future!");
     }
-    return expiryTime;
+    // Fail if the requested expiry is greater than the max
+    if (relExpiryTime > maxRelativeExpiryTime) {
+      throw new InvalidRequestException("Expiration " + expiry.toString()
+          + " exceeds the max relative expiration time of "
+          + maxRelativeExpiryTime + " ms.");
+    }
+    return absExpiryTime;
+  }
+
+  /**
+   * Throws an exception if the CachePool does not have enough capacity to
+   * cache the given path at the replication factor.
+   *
+   * @param pool CachePool where the path is being cached
+   * @param path Path that is being cached
+   * @param replication Replication factor of the path
+   * @throws InvalidRequestException if the pool does not have enough capacity
+   */
+  private void checkLimit(CachePool pool, String path,
+      short replication) throws InvalidRequestException {
+    CacheDirectiveStats stats = computeNeeded(path, replication);
+    if (pool.getLimit() == CachePoolInfo.LIMIT_UNLIMITED) {
+      return;
+    }
+    if (pool.getBytesNeeded() + (stats.getBytesNeeded() * replication) > pool
+        .getLimit()) {
+      throw new InvalidRequestException("Caching path " + path + " of size "
+          + stats.getBytesNeeded() / replication + " bytes at replication "
+          + replication + " would exceed pool " + pool.getPoolName()
+          + "'s remaining capacity of "
+          + (pool.getLimit() - pool.getBytesNeeded()) + " bytes.");
+    }
+  }
+
+  /**
+   * Computes the needed number of bytes and files for a path.
+   * @return CacheDirectiveStats describing the needed stats for this path
+   */
+  private CacheDirectiveStats computeNeeded(String path, short replication) {
+    FSDirectory fsDir = namesystem.getFSDirectory();
+    INode node;
+    long requestedBytes = 0;
+    long requestedFiles = 0;
+    CacheDirectiveStats.Builder builder = new CacheDirectiveStats.Builder();
+    try {
+      node = fsDir.getINode(path);
+    } catch (UnresolvedLinkException e) {
+      // We don't cache through symlinks
+      return builder.build();
+    }
+    if (node == null) {
+      return builder.build();
+    }
+    if (node.isFile()) {
+      requestedFiles = 1;
+      INodeFile file = node.asFile();
+      requestedBytes = file.computeFileSize();
+    } else if (node.isDirectory()) {
+      INodeDirectory dir = node.asDirectory();
+      ReadOnlyList<INode> children = dir.getChildrenList(null);
+      requestedFiles = children.size();
+      for (INode child : children) {
+        if (child.isFile()) {
+          requestedBytes += child.asFile().computeFileSize();
+        }
+      }
+    }
+    return new CacheDirectiveStats.Builder()
+        .setBytesNeeded(requestedBytes)
+        .setFilesCached(requestedFiles)
+        .build();
   }
 
   /**
@@ -384,20 +456,23 @@ public final class CacheManager {
       directivesByPath.put(path, directives);
     }
     directives.add(directive);
+    // Fix up pool stats
+    CacheDirectiveStats stats =
+        computeNeeded(directive.getPath(), directive.getReplication());
+    directive.addBytesNeeded(stats.getBytesNeeded());
+    directive.addFilesNeeded(directive.getFilesNeeded());
+
+    setNeedsRescan();
   }
 
   /**
-   * To be called only from the edit log loading code
+   * Adds a directive, skipping most error checking. This should only be called
+   * internally in special scenarios like edit log replay.
    */
   CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive)
       throws InvalidRequestException {
     long id = directive.getId();
-    CacheDirective entry =
-        new CacheDirective(
-            directive.getId(),
-            directive.getPath().toUri().getPath(),
-            directive.getReplication(),
-            directive.getExpiration().getAbsoluteMillis());
+    CacheDirective entry = new CacheDirective(directive);
     CachePool pool = cachePools.get(directive.getPool());
     addInternal(entry, pool);
     if (nextDirectiveId <= id) {
@@ -407,7 +482,7 @@ public final class CacheManager {
   }
 
   public CacheDirectiveInfo addDirective(
-      CacheDirectiveInfo info, FSPermissionChecker pc)
+      CacheDirectiveInfo info, FSPermissionChecker pc, EnumSet<CacheFlag> flags)
       throws IOException {
     assert namesystem.hasWriteLock();
     CacheDirective directive;
@@ -416,8 +491,11 @@ public final class CacheManager {
       checkWritePermission(pc, pool);
       String path = validatePath(info);
       short replication = validateReplication(info, (short)1);
-      long expiryTime = validateExpiryTime(info,
-          CacheDirectiveInfo.Expiration.EXPIRY_NEVER);
+      long expiryTime = validateExpiryTime(info, pool.getMaxRelativeExpiryMs());
+      // Do quota validation if required
+      if (!flags.contains(CacheFlag.FORCE)) {
+        checkLimit(pool, path, replication);
+      }
       // All validation passed
       // Add a new entry with the next available ID.
       long id = getNextDirectiveId();
@@ -428,14 +506,61 @@ public final class CacheManager {
       throw e;
     }
     LOG.info("addDirective of " + info + " successful.");
-    if (monitor != null) {
-      monitor.kick();
-    }
     return directive.toInfo();
   }
 
+  /**
+   * Factory method that makes a new CacheDirectiveInfo by applying fields in a
+   * CacheDirectiveInfo to an existing CacheDirective.
+   * 
+   * @param info with some or all fields set.
+   * @param defaults directive providing default values for unset fields in
+   *          info.
+   * 
+   * @return new CacheDirectiveInfo of the info applied to the defaults.
+   */
+  private static CacheDirectiveInfo createFromInfoAndDefaults(
+      CacheDirectiveInfo info, CacheDirective defaults) {
+    // Initialize the builder with the default values
+    CacheDirectiveInfo.Builder builder =
+        new CacheDirectiveInfo.Builder(defaults.toInfo());
+    // Replace default with new value if present
+    if (info.getPath() != null) {
+      builder.setPath(info.getPath());
+    }
+    if (info.getReplication() != null) {
+      builder.setReplication(info.getReplication());
+    }
+    if (info.getPool() != null) {
+      builder.setPool(info.getPool());
+    }
+    if (info.getExpiration() != null) {
+      builder.setExpiration(info.getExpiration());
+    }
+    return builder.build();
+  }
+
+  /**
+   * Modifies a directive, skipping most error checking. This is for careful
+   * internal use only. modifyDirective can be non-deterministic since its error
+   * checking depends on current system time, which poses a problem for edit log
+   * replay.
+   */
+  void modifyDirectiveFromEditLog(CacheDirectiveInfo info)
+      throws InvalidRequestException {
+    // Check for invalid IDs.
+    Long id = info.getId();
+    if (id == null) {
+      throw new InvalidRequestException("Must supply an ID.");
+    }
+    CacheDirective prevEntry = getById(id);
+    CacheDirectiveInfo newInfo = createFromInfoAndDefaults(info, prevEntry);
+    removeInternal(prevEntry);
+    addInternal(new CacheDirective(newInfo), getCachePool(newInfo.getPool()));
+  }
+
   public void modifyDirective(CacheDirectiveInfo info,
-      FSPermissionChecker pc) throws IOException {
+      FSPermissionChecker pc, EnumSet<CacheFlag> flags) throws IOException {
     assert namesystem.hasWriteLock();
     String idString =
         (info.getId() == null) ?
@@ -448,26 +573,36 @@ public final class CacheManager {
       }
       CacheDirective prevEntry = getById(id);
       checkWritePermission(pc, prevEntry.getPool());
-      String path = prevEntry.getPath();
-      if (info.getPath() != null) {
-        path = validatePath(info);
-      }
 
-      short replication = prevEntry.getReplication();
-      replication = validateReplication(info, replication);
+      // Fill in defaults
+      CacheDirectiveInfo infoWithDefaults =
+          createFromInfoAndDefaults(info, prevEntry);
+      CacheDirectiveInfo.Builder builder =
+          new CacheDirectiveInfo.Builder(infoWithDefaults);
+
+      // Do validation
+      validatePath(infoWithDefaults);
+      validateReplication(infoWithDefaults, (short)-1);
+      // Need to test the pool being set here to avoid rejecting a modify for a
+      // directive that's already been forced into a pool
+      CachePool srcPool = prevEntry.getPool();
+      CachePool destPool = getCachePool(validatePoolName(infoWithDefaults));
+      if (!srcPool.getPoolName().equals(destPool.getPoolName())) {
+        checkWritePermission(pc, destPool);
+        if (!flags.contains(CacheFlag.FORCE)) {
+          checkLimit(destPool, infoWithDefaults.getPath().toUri().getPath(),
+              infoWithDefaults.getReplication());
+        }
+      }
+      // Verify the expiration against the destination pool
+      validateExpiryTime(infoWithDefaults, destPool.getMaxRelativeExpiryMs());
 
-      long expiryTime = prevEntry.getExpiryTime();
-      expiryTime = validateExpiryTime(info, expiryTime);
+      // Indicate changes to the CRM
+      setNeedsRescan();
 
-      CachePool pool = prevEntry.getPool();
-      if (info.getPool() != null) {
-        pool = getCachePool(validatePoolName(info));
-        checkWritePermission(pc, pool);
-      }
+      // Validation passed
       removeInternal(prevEntry);
-      CacheDirective newEntry =
-          new CacheDirective(id, path, replication, expiryTime);
-      addInternal(newEntry, pool);
+      addInternal(new CacheDirective(builder.build()), destPool);
     } catch (IOException e) {
       LOG.warn("modifyDirective of " + idString + " failed: ", e);
       throw e;
@@ -476,7 +611,7 @@ public final class CacheManager {
         info+ ".");
   }
 
-  public void removeInternal(CacheDirective directive)
+  private void removeInternal(CacheDirective directive)
       throws InvalidRequestException {
     assert namesystem.hasWriteLock();
     // Remove the corresponding entry in directivesByPath.
@@ -489,9 +624,16 @@ public final class CacheManager {
     if (directives.size() == 0) {
       directivesByPath.remove(path);
     }
+    // Fix up the stats from removing the pool
+    final CachePool pool = directive.getPool();
+    directive.addBytesNeeded(-directive.getBytesNeeded());
+    directive.addFilesNeeded(-directive.getFilesNeeded());
+
     directivesById.remove(directive.getId());
-    directive.getPool().getDirectiveList().remove(directive);
+    pool.getDirectiveList().remove(directive);
     assert directive.getPool() == null;
+
+    setNeedsRescan();
   }
 
   public void removeDirective(long id, FSPermissionChecker pc)
@@ -505,9 +647,6 @@ public final class CacheManager {
       LOG.warn("removeDirective of " + id + " failed: ", e);
       throw e;
     }
-    if (monitor != null) {
-      monitor.kick();
-    }
     LOG.info("removeDirective of " + id + " successful.");
   }
 
@@ -573,16 +712,22 @@ public final class CacheManager {
   public CachePoolInfo addCachePool(CachePoolInfo info)
       throws IOException {
     assert namesystem.hasWriteLock();
-    CachePoolInfo.validate(info);
-    String poolName = info.getPoolName();
-    CachePool pool = cachePools.get(poolName);
-    if (pool != null) {
-      throw new InvalidRequestException("Cache pool " + poolName
-          + " already exists.");
-    }
-    pool = CachePool.createFromInfoAndDefaults(info);
-    cachePools.put(pool.getPoolName(), pool);
-    LOG.info("Created new cache pool " + pool);
+    CachePool pool;
+    try {
+      CachePoolInfo.validate(info);
+      String poolName = info.getPoolName();
+      pool = cachePools.get(poolName);
+      if (pool != null) {
+        throw new InvalidRequestException("Cache pool " + poolName
+            + " already exists.");
+      }
+      pool = CachePool.createFromInfoAndDefaults(info);
+      cachePools.put(pool.getPoolName(), pool);
+    } catch (IOException e) {
+      LOG.info("addCachePool of " + info + " failed: ", e);
+      throw e;
+    }
+    LOG.info("addCachePool of " + info + " successful.");
     return pool.getInfo(true);
   }
 
@@ -597,42 +742,56 @@ public final class CacheManager {
   public void modifyCachePool(CachePoolInfo info)
       throws IOException {
     assert namesystem.hasWriteLock();
-    CachePoolInfo.validate(info);
-    String poolName = info.getPoolName();
-    CachePool pool = cachePools.get(poolName);
-    if (pool == null) {
-      throw new InvalidRequestException("Cache pool " + poolName
-          + " does not exist.");
-    }
     StringBuilder bld = new StringBuilder();
-    String prefix = "";
-    if (info.getOwnerName() != null) {
-      pool.setOwnerName(info.getOwnerName());
-      bld.append(prefix).
-        append("set owner to ").append(info.getOwnerName());
-      prefix = "; ";
-    }
-    if (info.getGroupName() != null) {
-      pool.setGroupName(info.getGroupName());
-      bld.append(prefix).
-        append("set group to ").append(info.getGroupName());
-      prefix = "; ";
-    }
-    if (info.getMode() != null) {
-      pool.setMode(info.getMode());
-      bld.append(prefix).append("set mode to " + info.getMode());
-      prefix = "; ";
-    }
-    if (info.getWeight() != null) {
-      pool.setWeight(info.getWeight());
-      bld.append(prefix).
-        append("set weight to ").append(info.getWeight());
-      prefix = "; ";
-    }
-    if (prefix.isEmpty()) {
-      bld.append("no changes.");
+    try {
+      CachePoolInfo.validate(info);
+      String poolName = info.getPoolName();
+      CachePool pool = cachePools.get(poolName);
+      if (pool == null) {
+        throw new InvalidRequestException("Cache pool " + poolName
+            + " does not exist.");
+      }
+      String prefix = "";
+      if (info.getOwnerName() != null) {
+        pool.setOwnerName(info.getOwnerName());
+        bld.append(prefix).
+          append("set owner to ").append(info.getOwnerName());
+        prefix = "; ";
+      }
+      if (info.getGroupName() != null) {
+        pool.setGroupName(info.getGroupName());
+        bld.append(prefix).
+          append("set group to ").append(info.getGroupName());
+        prefix = "; ";
+      }
+      if (info.getMode() != null) {
+        pool.setMode(info.getMode());
+        bld.append(prefix).append("set mode to " + info.getMode());
+        prefix = "; ";
+      }
+      if (info.getLimit() != null) {
+        pool.setLimit(info.getLimit());
+        bld.append(prefix).append("set limit to " + info.getLimit());
+        prefix = "; ";
+        // New limit changes stats, need to set needs refresh
+        setNeedsRescan();
+      }
+      if (info.getMaxRelativeExpiryMs() != null) {
+        final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs();
+        pool.setMaxRelativeExpiryMs(maxRelativeExpiry);
+        bld.append(prefix).append("set maxRelativeExpiry to "
+            + maxRelativeExpiry);
+        prefix = "; ";
+      }
+      if (prefix.isEmpty()) {
+        bld.append("no changes.");
+      }
+    } catch (IOException e) {
+      LOG.info("modifyCachePool of " + info + " failed: ", e);
+      throw e;
     }
-    LOG.info("modified " + poolName + "; " + bld.toString());
+    LOG.info("modifyCachePool of " + info.getPoolName() + " successful; "
+        + bld.toString());
   }
 
   /**
@@ -646,23 +805,27 @@ public final class CacheManager {
   public void removeCachePool(String poolName)
       throws IOException {
     assert namesystem.hasWriteLock();
-    CachePoolInfo.validateName(poolName);
-    CachePool pool = cachePools.remove(poolName);
-    if (pool == null) {
-      throw new InvalidRequestException(
-          "Cannot remove non-existent cache pool " + poolName);
-    }
-    // Remove all directives in this pool.
-    Iterator<CacheDirective> iter = pool.getDirectiveList().iterator();
-    while (iter.hasNext()) {
-      CacheDirective directive = iter.next();
-      directivesByPath.remove(directive.getPath());
-      directivesById.remove(directive.getId());
-      iter.remove();
-    }
-    if (monitor != null) {
-      monitor.kick();
+    try {
+      CachePoolInfo.validateName(poolName);
+      CachePool pool = cachePools.remove(poolName);
+      if (pool == null) {
+        throw new InvalidRequestException(
+            "Cannot remove non-existent cache pool " + poolName);
+      }
+      // Remove all directives in this pool.
+      Iterator<CacheDirective> iter = pool.getDirectiveList().iterator();
+      while (iter.hasNext()) {
+        CacheDirective directive = iter.next();
+        directivesByPath.remove(directive.getPath());
+        directivesById.remove(directive.getId());
+        iter.remove();
+      }
+      setNeedsRescan();
+    } catch (IOException e) {
+      LOG.info("removeCachePool of " + poolName + " failed: ", e);
+      throw e;
     }
+    LOG.info("removeCachePool of " + poolName + " successful.");
   }
 
   public BatchedListEntries<CachePoolEntry>
@@ -683,9 +846,6 @@ public final class CacheManager {
   }
 
   public void setCachedLocations(LocatedBlock block) {
-    if (!enabled) {
-      return;
-    }
     CachedBlock cachedBlock =
         new CachedBlock(block.getBlock().getBlockId(),
             (short)0, false);
@@ -701,12 +861,6 @@ public final class CacheManager {
 
   public final void processCacheReport(final DatanodeID datanodeID,
       final List<Long> blockIds) throws IOException {
-    if (!enabled) {
-      LOG.info("Ignoring cache report from " + datanodeID +
-          " because " + DFS_NAMENODE_CACHING_ENABLED_KEY + " = false. " +
-          "number of blocks: " + blockIds.size());
-      return;
-    }
     namesystem.writeLock();
     final long startTime = Time.monotonicNow();
     final long endTime;
@@ -738,39 +892,28 @@ public final class CacheManager {
       final List<Long> blockIds) {
     CachedBlocksList cached = datanode.getCached();
     cached.clear();
+    CachedBlocksList cachedList = datanode.getCached();
+    CachedBlocksList pendingCachedList = datanode.getPendingCached();
     for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) {
-      Block block = new Block(iter.next());
-      BlockInfo blockInfo = blockManager.getStoredBlock(block);
-      if (!blockInfo.isComplete()) {
-        LOG.warn("Ignoring block id " + block.getBlockId() + ", because " +
-            "it is in not complete yet.  It is in state " + 
-            blockInfo.getBlockUCState());
-        continue;
-      }
-      Collection<DatanodeDescriptor> corruptReplicas =
-          blockManager.getCorruptReplicas(blockInfo);
-      if ((corruptReplicas != null) && corruptReplicas.contains(datanode)) {
-        // The NameNode will eventually remove or update the corrupt block.
-        // Until then, we pretend that it isn't cached.
-        LOG.warn("Ignoring cached replica on " + datanode + " of " + block +
-            " because it is corrupt.");
-        continue;
-      }
+      long blockId = iter.next();
       CachedBlock cachedBlock =
-          new CachedBlock(block.getBlockId(), (short)0, false);
+          new CachedBlock(blockId, (short)0, false);
       CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock);
-      // Use the existing CachedBlock if it's present; otherwise,
-      // insert a new one.
+      // Add the block ID from the cache report to the cachedBlocks map
+      // if it's not already there.
       if (prevCachedBlock != null) {
         cachedBlock = prevCachedBlock;
       } else {
         cachedBlocks.put(cachedBlock);
       }
-      if (!cachedBlock.isPresent(datanode.getCached())) {
-        datanode.getCached().add(cachedBlock);
+      // Add the block to the datanode's implicit cached block list
+      // if it's not already there.  Similarly, remove it from the pending
+      // cached block list if it exists there.
+      if (!cachedBlock.isPresent(cachedList)) {
+        cachedList.add(cachedBlock);
       }
-      if (cachedBlock.isPresent(datanode.getPendingCached())) {
-        datanode.getPendingCached().remove(cachedBlock);
+      if (cachedBlock.isPresent(pendingCachedList)) {
+        pendingCachedList.remove(cachedBlock);
       }
     }
   }
@@ -782,7 +925,7 @@ public final class CacheManager {
    * @param sdPath path of the storage directory
    * @throws IOException
    */
-  public void saveState(DataOutput out, String sdPath)
+  public void saveState(DataOutputStream out, String sdPath)
       throws IOException {
     out.writeLong(nextDirectiveId);
     savePools(out, sdPath);
@@ -805,7 +948,7 @@ public final class CacheManager {
   /**
    * Save cache pools to fsimage
    */
-  private void savePools(DataOutput out,
+  private void savePools(DataOutputStream out,
       String sdPath) throws IOException {
     StartupProgress prog = NameNode.getStartupProgress();
     Step step = new Step(StepType.CACHE_POOLS, sdPath);
@@ -814,7 +957,7 @@ public final class CacheManager {
     Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
     out.writeInt(cachePools.size());
     for (CachePool pool: cachePools.values()) {
-      pool.getInfo(true).writeTo(out);
+      FSImageSerialization.writeCachePoolInfo(out, pool.getInfo(true));
       counter.increment();
     }
     prog.endStep(Phase.SAVING_CHECKPOINT, step);
@@ -823,7 +966,7 @@ public final class CacheManager {
   /*
    * Save cache entries to fsimage
    */
-  private void saveDirectives(DataOutput out, String sdPath)
+  private void saveDirectives(DataOutputStream out, String sdPath)
       throws IOException {
     StartupProgress prog = NameNode.getStartupProgress();
     Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
@@ -832,11 +975,7 @@ public final class CacheManager {
     Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
     out.writeInt(directivesById.size());
     for (CacheDirective directive : directivesById.values()) {
-      out.writeLong(directive.getId());
-      Text.writeString(out, directive.getPath());
-      out.writeShort(directive.getReplication());
-      Text.writeString(out, directive.getPool().getPoolName());
-      out.writeLong(directive.getExpiryTime());
+      FSImageSerialization.writeCacheDirectiveInfo(out, directive.toInfo());
       counter.increment();
     }
     prog.endStep(Phase.SAVING_CHECKPOINT, step);
@@ -854,7 +993,7 @@ public final class CacheManager {
     prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools);
     Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
     for (int i = 0; i < numberOfPools; i++) {
-      addCachePool(CachePoolInfo.readFrom(in));
+      addCachePool(FSImageSerialization.readCachePoolInfo(in));
       counter.increment();
     }
     prog.endStep(Phase.LOADING_FSIMAGE, step);
@@ -871,19 +1010,17 @@ public final class CacheManager {
     prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives);
     Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
     for (int i = 0; i < numDirectives; i++) {
-      long directiveId = in.readLong();
-      String path = Text.readString(in);
-      short replication = in.readShort();
-      String poolName = Text.readString(in);
-      long expiryTime = in.readLong();
+      CacheDirectiveInfo info = FSImageSerialization.readCacheDirectiveInfo(in);
       // Get pool reference by looking it up in the map
+      final String poolName = info.getPool();
       CachePool pool = cachePools.get(poolName);
       if (pool == null) {
         throw new IOException("Directive refers to pool " + poolName +
             ", which does not exist.");
       }
       CacheDirective directive =
-          new CacheDirective(directiveId, path, replication, expiryTime);
+          new CacheDirective(info.getId(), info.getPath().toUri().getPath(),
+              info.getReplication(), info.getExpiration().getAbsoluteMillis());
       boolean addedDirective = pool.getDirectiveList().add(directive);
       assert addedDirective;
       if (directivesById.put(directive.getId(), directive) != null) {
@@ -901,4 +1038,36 @@ public final class CacheManager {
     }
     prog.endStep(Phase.LOADING_FSIMAGE, step);
   }
+
+  public void waitForRescanIfNeeded() {
+    crmLock.lock();
+    try {
+      if (monitor != null) {
+        monitor.waitForRescanIfNeeded();
+      }
+    } finally {
+      crmLock.unlock();
+    }
+  }
+
+  private void setNeedsRescan() {
+    crmLock.lock();
+    try {
+      if (monitor != null) {
+        monitor.setNeedsRescan();
+      }
+    } finally {
+      crmLock.unlock();
+    }
+  }
+
+  @VisibleForTesting
+  public Thread getCacheReplicationMonitor() {
+    crmLock.lock();
+    try {
+      return monitor;
+    } finally {
+      crmLock.unlock();
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java Fri Jan  3 07:26:52 2014
@@ -49,8 +49,6 @@ import com.google.common.base.Preconditi
 public final class CachePool {
   public static final Log LOG = LogFactory.getLog(CachePool.class);
 
-  public static final int DEFAULT_WEIGHT = 100;
-  
   @Nonnull
   private final String poolName;
 
@@ -71,7 +69,16 @@ public final class CachePool {
   @Nonnull
   private FsPermission mode;
 
-  private int weight;
+  /**
+   * Maximum number of bytes that can be cached in this pool.
+   */
+  private long limit;
+
+  /**
+   * Maximum duration that a CacheDirective in this pool remains valid,
+   * in milliseconds.
+   */
+  private long maxRelativeExpiryMs;
 
   private long bytesNeeded;
   private long bytesCached;
@@ -118,10 +125,13 @@ public final class CachePool {
     }
     FsPermission mode = (info.getMode() == null) ? 
         FsPermission.getCachePoolDefault() : info.getMode();
-    Integer weight = (info.getWeight() == null) ?
-        DEFAULT_WEIGHT : info.getWeight();
+    long limit = info.getLimit() == null ?
+        CachePoolInfo.DEFAULT_LIMIT : info.getLimit();
+    long maxRelativeExpiry = info.getMaxRelativeExpiryMs() == null ?
+        CachePoolInfo.DEFAULT_MAX_RELATIVE_EXPIRY :
+        info.getMaxRelativeExpiryMs();
     return new CachePool(info.getPoolName(),
-        ownerName, groupName, mode, weight);
+        ownerName, groupName, mode, limit, maxRelativeExpiry);
   }
 
   /**
@@ -131,11 +141,11 @@ public final class CachePool {
   static CachePool createFromInfo(CachePoolInfo info) {
     return new CachePool(info.getPoolName(),
         info.getOwnerName(), info.getGroupName(),
-        info.getMode(), info.getWeight());
+        info.getMode(), info.getLimit(), info.getMaxRelativeExpiryMs());
   }
 
   CachePool(String poolName, String ownerName, String groupName,
-      FsPermission mode, int weight) {
+      FsPermission mode, long limit, long maxRelativeExpiry) {
     Preconditions.checkNotNull(poolName);
     Preconditions.checkNotNull(ownerName);
     Preconditions.checkNotNull(groupName);
@@ -144,7 +154,8 @@ public final class CachePool {
     this.ownerName = ownerName;
     this.groupName = groupName;
     this.mode = new FsPermission(mode);
-    this.weight = weight;
+    this.limit = limit;
+    this.maxRelativeExpiryMs = maxRelativeExpiry;
   }
 
   public String getPoolName() {
@@ -177,16 +188,25 @@ public final class CachePool {
     this.mode = new FsPermission(mode);
     return this;
   }
-  
-  public int getWeight() {
-    return weight;
+
+  public long getLimit() {
+    return limit;
   }
 
-  public CachePool setWeight(int weight) {
-    this.weight = weight;
+  public CachePool setLimit(long bytes) {
+    this.limit = bytes;
     return this;
   }
-  
+
+  public long getMaxRelativeExpiryMs() {
+    return maxRelativeExpiryMs;
+  }
+
+  public CachePool setMaxRelativeExpiryMs(long expiry) {
+    this.maxRelativeExpiryMs = expiry;
+    return this;
+  }
+
   /**
    * Get either full or partial information about this CachePool.
    *
@@ -204,7 +224,8 @@ public final class CachePool {
     return info.setOwnerName(ownerName).
         setGroupName(groupName).
         setMode(new FsPermission(mode)).
-        setWeight(weight);
+        setLimit(limit).
+        setMaxRelativeExpiryMs(maxRelativeExpiryMs);
   }
 
   /**
@@ -241,6 +262,10 @@ public final class CachePool {
     return bytesCached;
   }
 
+  public long getBytesOverlimit() {
+    return Math.max(bytesNeeded-limit, 0);
+  }
+
   public long getFilesNeeded() {
     return filesNeeded;
   }
@@ -258,6 +283,7 @@ public final class CachePool {
     return new CachePoolStats.Builder().
         setBytesNeeded(bytesNeeded).
         setBytesCached(bytesCached).
+        setBytesOverlimit(getBytesOverlimit()).
         setFilesNeeded(filesNeeded).
         setFilesCached(filesCached).
         build();
@@ -291,7 +317,8 @@ public final class CachePool {
         append(", ownerName:").append(ownerName).
         append(", groupName:").append(groupName).
         append(", mode:").append(mode).
-        append(", weight:").append(weight).
+        append(", limit:").append(limit).
+        append(", maxRelativeExpiryMs:").append(maxRelativeExpiryMs).
         append(" }").toString();
   }
 

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DirectoryWithQuotaFeature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DirectoryWithQuotaFeature.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DirectoryWithQuotaFeature.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/DirectoryWithQuotaFeature.java Fri Jan  3 07:26:52 2014
@@ -25,7 +25,7 @@ import org.apache.hadoop.hdfs.protocol.Q
 /**
  * Quota feature for {@link INodeDirectory}. 
  */
-public final class DirectoryWithQuotaFeature extends INodeDirectory.Feature {
+public final class DirectoryWithQuotaFeature implements INode.Feature {
   public static final long DEFAULT_NAMESPACE_QUOTA = Long.MAX_VALUE;
   public static final long DEFAULT_DISKSPACE_QUOTA = HdfsConstants.QUOTA_RESET;
 
@@ -153,6 +153,10 @@ public final class DirectoryWithQuotaFea
     verifyNamespaceQuota(nsDelta);
     verifyDiskspaceQuota(dsDelta);
   }
+  
+  boolean isQuotaSet() {
+    return nsQuota >= 0 || dsQuota >= 0;
+  }
 
   private String namespaceString() {
     return "namespace: " + (nsQuota < 0? "-": namespace + "/" + nsQuota);

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Jan  3 07:26:52 2014
@@ -62,11 +62,11 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithCount;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.Root;
 import org.apache.hadoop.hdfs.util.ByteArray;
@@ -364,7 +364,7 @@ public class FSDirectory implements Clos
    * Add a block to the file. Returns a reference to the added block.
    */
   BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block,
-      DatanodeDescriptor targets[]) throws IOException {
+      DatanodeStorageInfo[] targets) throws IOException {
     waitForReady();
 
     writeLock();
@@ -621,8 +621,7 @@ public class FSDirectory implements Clos
     // snapshot is taken on the dst tree, changes will be recorded in the latest
     // snapshot of the src tree.
     if (isSrcInSnapshot) {
-      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshot(),
-          inodeMap);
+      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshot());
       srcIIP.setLastINode(srcChild);
     }
     
@@ -691,11 +690,9 @@ public class FSDirectory implements Clos
         }
         // update modification time of dst and the parent of src
         final INode srcParent = srcIIP.getINode(-2);
-        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshot(),
-            inodeMap);
+        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshot());
         dstParent = dstIIP.getINode(-2); // refresh dstParent
-        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshot(),
-            inodeMap);
+        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshot());
         // update moved leases with new filename
         getFSNamesystem().unprotectedChangeLease(src, dst);     
 
@@ -733,11 +730,10 @@ public class FSDirectory implements Clos
         }
         
         if (isSrcInSnapshot) {
-          // srcParent must be an INodeDirectoryWithSnapshot instance since
-          // isSrcInSnapshot is true and src node has been removed from 
-          // srcParent 
-          ((INodeDirectoryWithSnapshot) srcParent).undoRename4ScrParent(
-              oldSrcChild.asReference(), srcChild, srcIIP.getLatestSnapshot());
+          // srcParent must have snapshot feature since isSrcInSnapshot is true
+          // and src node has been removed from srcParent 
+          srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild,
+              srcIIP.getLatestSnapshot());
         } else {
           // original srcChild is not in latest snapshot, we only need to add
           // the srcChild back
@@ -878,8 +874,7 @@ public class FSDirectory implements Clos
     // snapshot is taken on the dst tree, changes will be recorded in the latest
     // snapshot of the src tree.
     if (isSrcInSnapshot) {
-      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshot(),
-          inodeMap);
+      srcChild = srcChild.recordModification(srcIIP.getLatestSnapshot());
       srcIIP.setLastINode(srcChild);
     }
     
@@ -957,11 +952,9 @@ public class FSDirectory implements Clos
         }
 
         final INode srcParent = srcIIP.getINode(-2);
-        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshot(),
-            inodeMap);
+        srcParent.updateModificationTime(timestamp, srcIIP.getLatestSnapshot());
         dstParent = dstIIP.getINode(-2);
-        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshot(),
-            inodeMap);
+        dstParent.updateModificationTime(timestamp, dstIIP.getLatestSnapshot());
         // update moved lease with new filename
         getFSNamesystem().unprotectedChangeLease(src, dst);
 
@@ -1018,9 +1011,9 @@ public class FSDirectory implements Clos
           withCount.getReferredINode().setLocalName(srcChildName);
         }
         
-        if (srcParent instanceof INodeDirectoryWithSnapshot) {
-          ((INodeDirectoryWithSnapshot) srcParent).undoRename4ScrParent(
-              oldSrcChild.asReference(), srcChild, srcIIP.getLatestSnapshot());
+        if (srcParent.isWithSnapshot()) {
+          srcParent.undoRename4ScrParent(oldSrcChild.asReference(), srcChild,
+              srcIIP.getLatestSnapshot());
         } else {
           // srcParent is not an INodeDirectoryWithSnapshot, we only need to add
           // the srcChild back
@@ -1029,9 +1022,9 @@ public class FSDirectory implements Clos
       }
       if (undoRemoveDst) {
         // Rename failed - restore dst
-        if (dstParent instanceof INodeDirectoryWithSnapshot) {
-          ((INodeDirectoryWithSnapshot) dstParent).undoRename4DstParent(
-              removedDst, dstIIP.getLatestSnapshot());
+        if (dstParent.isDirectory() && dstParent.asDirectory().isWithSnapshot()) {
+          dstParent.asDirectory().undoRename4DstParent(removedDst,
+              dstIIP.getLatestSnapshot());
         } else {
           addLastINodeNoQuotaCheck(dstIIP, removedDst);
         }
@@ -1162,8 +1155,7 @@ public class FSDirectory implements Clos
     if (inode == null) {
       throw new FileNotFoundException("File does not exist: " + src);
     }
-    inode.setPermission(permissions, inodesInPath.getLatestSnapshot(), 
-        inodeMap);
+    inode.setPermission(permissions, inodesInPath.getLatestSnapshot());
   }
 
   void setOwner(String src, String username, String groupname)
@@ -1188,11 +1180,10 @@ public class FSDirectory implements Clos
       throw new FileNotFoundException("File does not exist: " + src);
     }
     if (username != null) {
-      inode = inode.setUser(username, inodesInPath.getLatestSnapshot(),
-          inodeMap);
+      inode = inode.setUser(username, inodesInPath.getLatestSnapshot());
     }
     if (groupname != null) {
-      inode.setGroup(groupname, inodesInPath.getLatestSnapshot(), inodeMap);
+      inode.setGroup(groupname, inodesInPath.getLatestSnapshot());
     }
   }
 
@@ -1265,7 +1256,7 @@ public class FSDirectory implements Clos
       if(nodeToRemove == null) continue;
       
       nodeToRemove.setBlocks(null);
-      trgParent.removeChild(nodeToRemove, trgLatestSnapshot, null);
+      trgParent.removeChild(nodeToRemove, trgLatestSnapshot);
       inodeMap.remove(nodeToRemove);
       count++;
     }
@@ -1273,8 +1264,8 @@ public class FSDirectory implements Clos
     // update inodeMap
     removeFromInodeMap(Arrays.asList(allSrcInodes));
     
-    trgInode.setModificationTime(timestamp, trgLatestSnapshot, inodeMap);
-    trgParent.updateModificationTime(timestamp, trgLatestSnapshot, inodeMap);
+    trgInode.setModificationTime(timestamp, trgLatestSnapshot);
+    trgParent.updateModificationTime(timestamp, trgLatestSnapshot);
     // update quota on the parent directory ('count' files removed, 0 space)
     unprotectedUpdateCount(trgIIP, trgINodes.length-1, -count, 0);
   }
@@ -1418,7 +1409,7 @@ public class FSDirectory implements Clos
 
     // record modification
     final Snapshot latestSnapshot = iip.getLatestSnapshot();
-    targetNode = targetNode.recordModification(latestSnapshot, inodeMap);
+    targetNode = targetNode.recordModification(latestSnapshot);
     iip.setLastINode(targetNode);
 
     // Remove the node from the namespace
@@ -1429,7 +1420,7 @@ public class FSDirectory implements Clos
 
     // set the parent's modification time
     final INodeDirectory parent = targetNode.getParent();
-    parent.updateModificationTime(mtime, latestSnapshot, inodeMap);
+    parent.updateModificationTime(mtime, latestSnapshot);
     if (removed == 0) {
       return 0;
     }
@@ -2202,8 +2193,7 @@ public class FSDirectory implements Clos
     final INodeDirectory parent = inodes[pos-1].asDirectory();
     boolean added = false;
     try {
-      added = parent.addChild(child, true, iip.getLatestSnapshot(),
-          inodeMap);
+      added = parent.addChild(child, true, iip.getLatestSnapshot());
     } catch (QuotaExceededException e) {
       updateCountNoQuotaCheck(iip, pos,
           -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
@@ -2241,7 +2231,7 @@ public class FSDirectory implements Clos
     final Snapshot latestSnapshot = iip.getLatestSnapshot();
     final INode last = iip.getLastINode();
     final INodeDirectory parent = iip.getINode(-2).asDirectory();
-    if (!parent.removeChild(last, latestSnapshot, inodeMap)) {
+    if (!parent.removeChild(last, latestSnapshot)) {
       return -1;
     }
     INodeDirectory newParent = last.getParent();
@@ -2393,7 +2383,7 @@ public class FSDirectory implements Clos
       }
 
       final Snapshot latest = iip.getLatestSnapshot();
-      dirNode = dirNode.recordModification(latest, inodeMap);
+      dirNode = dirNode.recordModification(latest);
       dirNode.setQuota(nsQuota, dsQuota);
       return dirNode;
     }
@@ -2461,7 +2451,7 @@ public class FSDirectory implements Clos
     assert hasWriteLock();
     boolean status = false;
     if (mtime != -1) {
-      inode = inode.setModificationTime(mtime, latest, inodeMap);
+      inode = inode.setModificationTime(mtime, latest);
       status = true;
     }
     if (atime != -1) {
@@ -2472,7 +2462,7 @@ public class FSDirectory implements Clos
       if (atime <= inodeTime + getFSNamesystem().getAccessTimePrecision() && !force) {
         status =  false;
       } else {
-        inode.setAccessTime(atime, latest, inodeMap);
+        inode.setAccessTime(atime, latest);
         status = true;
       }
     } 

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Jan  3 07:26:52 2014
@@ -160,10 +160,10 @@ public class FSEditLog implements LogsPu
   private long totalTimeTransactions;  // total time for all transactions
   private NameNodeMetrics metrics;
 
-  private NNStorage storage;
-  private Configuration conf;
+  private final NNStorage storage;
+  private final Configuration conf;
   
-  private List<URI> editsDirs;
+  private final List<URI> editsDirs;
 
   private ThreadLocal<OpInstanceCache> cache =
       new ThreadLocal<OpInstanceCache>() {
@@ -176,7 +176,7 @@ public class FSEditLog implements LogsPu
   /**
    * The edit directories that are shared between primary and secondary.
    */
-  private List<URI> sharedEditsDirs;
+  private final List<URI> sharedEditsDirs;
 
   private static class TransactionId {
     public long txid;
@@ -203,10 +203,6 @@ public class FSEditLog implements LogsPu
    * @param editsDirs List of journals to use
    */
   FSEditLog(Configuration conf, NNStorage storage, List<URI> editsDirs) {
-    init(conf, storage, editsDirs);
-  }
-  
-  private void init(Configuration conf, NNStorage storage, List<URI> editsDirs) {
     isSyncRunning = false;
     this.conf = conf;
     this.storage = storage;

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Fri Jan  3 07:26:52 2014
@@ -24,25 +24,27 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Arrays;
 import java.util.EnumMap;
+import java.util.EnumSet;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
@@ -55,11 +57,11 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DisallowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCacheDirectiveInfoOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@@ -354,8 +356,8 @@ public class FSEditLogLoader {
       // update the block list.
       
       // Update the salient file attributes.
-      newFile.setAccessTime(addCloseOp.atime, null, fsDir.getINodeMap());
-      newFile.setModificationTime(addCloseOp.mtime, null, fsDir.getINodeMap());
+      newFile.setAccessTime(addCloseOp.atime, null);
+      newFile.setModificationTime(addCloseOp.mtime, null);
       updateBlocks(fsDir, addCloseOp, newFile);
       break;
     }
@@ -373,8 +375,8 @@ public class FSEditLogLoader {
       final INodeFile file = INodeFile.valueOf(iip.getINode(0), addCloseOp.path);
 
       // Update the salient file attributes.
-      file.setAccessTime(addCloseOp.atime, null, fsDir.getINodeMap());
-      file.setModificationTime(addCloseOp.mtime, null, fsDir.getINodeMap());
+      file.setAccessTime(addCloseOp.atime, null);
+      file.setModificationTime(addCloseOp.mtime, null);
       updateBlocks(fsDir, addCloseOp, file);
 
       // Now close the file
@@ -649,8 +651,8 @@ public class FSEditLogLoader {
     case OP_MODIFY_CACHE_DIRECTIVE: {
       ModifyCacheDirectiveInfoOp modifyOp =
           (ModifyCacheDirectiveInfoOp) op;
-      fsNamesys.getCacheManager().modifyDirective(
-          modifyOp.directive, null);
+      fsNamesys.getCacheManager().modifyDirectiveFromEditLog(
+          modifyOp.directive);
       if (toAddRetryCache) {
         fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
       }



Mime
View raw message