hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1547122 [3/5] - in /hadoop/common/branches/HDFS-2832/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ hadoop-hdfs/dev-support/ h...
Date Mon, 02 Dec 2013 17:41:48 GMT
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Mon Dec  2 17:41:44 2013
@@ -52,7 +52,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
@@ -375,7 +375,7 @@ public class FSImageFormat {
     final long dsQuota = q.get(Quota.DISKSPACE);
     FSDirectory fsDir = namesystem.dir;
     if (nsQuota != -1 || dsQuota != -1) {
-      fsDir.rootDir.setQuota(nsQuota, dsQuota);
+      fsDir.rootDir.getDirectoryWithQuotaFeature().setQuota(nsQuota, dsQuota);
     }
     fsDir.rootDir.cloneModificationTime(root);
     fsDir.rootDir.clonePermissionStatus(root);    
@@ -729,10 +729,11 @@ public class FSImageFormat {
       if (counter != null) {
         counter.increment();
       }
-      final INodeDirectory dir = nsQuota >= 0 || dsQuota >= 0?
-          new INodeDirectoryWithQuota(inodeId, localName, permissions,
-              modificationTime, nsQuota, dsQuota)
-          : new INodeDirectory(inodeId, localName, permissions, modificationTime);
+      final INodeDirectory dir = new INodeDirectory(inodeId, localName,
+          permissions, modificationTime);
+      if (nsQuota >= 0 || dsQuota >= 0) {
+        dir.addDirectoryWithQuotaFeature(nsQuota, dsQuota);
+      }
       return snapshottable ? new INodeDirectorySnapshottable(dir)
           : withSnapshot ? new INodeDirectoryWithSnapshot(dir)
           : dir;
@@ -972,13 +973,14 @@ public class FSImageFormat {
       checkNotSaved();
 
       final FSNamesystem sourceNamesystem = context.getSourceNamesystem();
-      FSDirectory fsDir = sourceNamesystem.dir;
+      final INodeDirectory rootDir = sourceNamesystem.dir.rootDir;
+      final long numINodes = rootDir.getDirectoryWithQuotaFeature()
+          .getSpaceConsumed().get(Quota.NAMESPACE);
       String sdPath = newFile.getParentFile().getParentFile().getAbsolutePath();
       Step step = new Step(StepType.INODES, sdPath);
       StartupProgress prog = NameNode.getStartupProgress();
       prog.beginStep(Phase.SAVING_CHECKPOINT, step);
-      prog.setTotal(Phase.SAVING_CHECKPOINT, step,
-        fsDir.rootDir.numItemsInTree());
+      prog.setTotal(Phase.SAVING_CHECKPOINT, step, numINodes);
       Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
       long startTime = now();
       //
@@ -997,7 +999,7 @@ public class FSImageFormat {
         // fairness-related deadlock. See the comments on HDFS-2223.
         out.writeInt(sourceNamesystem.unprotectedGetNamespaceInfo()
             .getNamespaceID());
-        out.writeLong(fsDir.rootDir.numItemsInTree());
+        out.writeLong(numINodes);
         out.writeLong(sourceNamesystem.getGenerationStampV1());
         out.writeLong(sourceNamesystem.getGenerationStampV2());
         out.writeLong(sourceNamesystem.getGenerationStampAtblockIdSwitch());
@@ -1014,14 +1016,13 @@ public class FSImageFormat {
                  " using " + compression);
 
         // save the root
-        saveINode2Image(fsDir.rootDir, out, false, referenceMap, counter);
+        saveINode2Image(rootDir, out, false, referenceMap, counter);
         // save the rest of the nodes
-        saveImage(fsDir.rootDir, out, true, false, counter);
+        saveImage(rootDir, out, true, false, counter);
         prog.endStep(Phase.SAVING_CHECKPOINT, step);
         // Now that the step is finished, set counter equal to total to adjust
         // for possible under-counting due to reference inodes.
-        prog.setCount(Phase.SAVING_CHECKPOINT, step,
-          fsDir.rootDir.numItemsInTree());
+        prog.setCount(Phase.SAVING_CHECKPOINT, step, numINodes);
         // save files under construction
         // TODO: for HDFS-5428, since we cannot break the compatibility of 
         // fsimage, we store part of the under-construction files that are only

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Mon Dec  2 17:41:44 2013
@@ -165,6 +165,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
@@ -6422,6 +6423,16 @@ public class FSNamesystem implements Nam
   }
 
   @Override // NameNodeMXBean
+  public long getCacheCapacity() {
+    return datanodeStatistics.getCacheCapacity();
+  }
+
+  @Override // NameNodeMXBean
+  public long getCacheUsed() {
+    return datanodeStatistics.getCacheUsed();
+  }
+
+  @Override // NameNodeMXBean
   public long getTotalBlocks() {
     return getBlocksTotal();
   }
@@ -6627,7 +6638,7 @@ public class FSNamesystem implements Nam
         } else if (openForWrite) {
           EditLogOutputStream elos = jas.getCurrentStream();
           if (elos != null) {
-            jasMap.put("stream", elos.generateHtmlReport());
+            jasMap.put("stream", elos.generateReport());
           } else {
             jasMap.put("stream", "not currently writing");
           }
@@ -7277,11 +7288,11 @@ public class FSNamesystem implements Nam
     getEditLog().logSync();
   }
 
-  public BatchedListEntries<CachePoolInfo> listCachePools(String prevKey)
+  public BatchedListEntries<CachePoolEntry> listCachePools(String prevKey)
       throws IOException {
     final FSPermissionChecker pc =
         isPermissionEnabled ? getPermissionChecker() : null;
-    BatchedListEntries<CachePoolInfo> results;
+    BatchedListEntries<CachePoolEntry> results;
     checkOperation(OperationCategory.READ);
     boolean success = false;
     readLock();

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileUnderConstructionFeature.java Mon Dec  2 17:41:44 2013
@@ -26,7 +26,7 @@ import org.apache.hadoop.hdfs.server.blo
 import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
 
 /**
- * I-node for file being written.
+ * Feature for under-construction file.
  */
 @InterfaceAudience.Private
 public class FileUnderConstructionFeature extends INodeFile.Feature {

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java Mon Dec  2 17:41:44 2013
@@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.DstReference;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference.WithName;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.util.ChunkedArrayList;
@@ -315,7 +314,7 @@ public abstract class INode implements I
    * 1.2.2 Else do nothing with the current INode. Recursively clean its 
    * children.
    * 
-   * 1.3 The current inode is a {@link FileWithSnapshot}.
+   * 1.3 The current inode is a file with snapshot.
    * Call recordModification(..) to capture the current states.
    * Mark the INode as deleted.
    * 
@@ -328,7 +327,7 @@ public abstract class INode implements I
    * 2. When deleting a snapshot.
    * 2.1 To clean {@link INodeFile}: do nothing.
    * 2.2 To clean {@link INodeDirectory}: recursively clean its children.
-   * 2.3 To clean {@link FileWithSnapshot}: delete the corresponding snapshot in
+   * 2.3 To clean INodeFile with snapshot: delete the corresponding snapshot in
    * its diff list.
    * 2.4 To clean {@link INodeDirectoryWithSnapshot}: delete the corresponding 
    * snapshot in its diff list. Recursively clean its children.
@@ -406,6 +405,15 @@ public abstract class INode implements I
    */
   public void addSpaceConsumed(long nsDelta, long dsDelta, boolean verify) 
       throws QuotaExceededException {
+    addSpaceConsumed2Parent(nsDelta, dsDelta, verify);
+  }
+
+  /**
+   * Check and add namespace/diskspace consumed to itself and the ancestors.
+   * @throws QuotaExceededException if quote is violated.
+   */
+  void addSpaceConsumed2Parent(long nsDelta, long dsDelta, boolean verify) 
+      throws QuotaExceededException {
     if (parent != null) {
       parent.addSpaceConsumed(nsDelta, dsDelta, verify);
     }
@@ -744,4 +752,51 @@ public abstract class INode implements I
       toDeleteList.clear();
     }
   }
+
+  /** INode feature such as {@link FileUnderConstructionFeature}
+   *  and {@link DirectoryWithQuotaFeature}.
+   */
+  interface Feature<F extends Feature<F>> {
+    /** @return the next feature. */
+    public F getNextFeature();
+
+    /** Set the next feature. */
+    public void setNextFeature(F next);
+
+    /** Utility methods such as addFeature and removeFeature. */
+    static class Util {
+      /**
+       * Add a feature to the linked list.
+       * @return the new head.
+       */
+      static <F extends Feature<F>> F addFeature(F feature, F head) {
+        feature.setNextFeature(head);
+        return feature;
+      }
+
+      /**
+       * Remove a feature from the linked list.
+       * @return the new head.
+       */
+      static <F extends Feature<F>> F removeFeature(F feature, F head) {
+        if (feature == head) {
+          final F newHead = head.getNextFeature();
+          head.setNextFeature(null);
+          return newHead;
+        } else if (head != null) {
+          F prev = head;
+          F curr = head.getNextFeature();
+          for (; curr != null && curr != feature;
+              prev = curr, curr = curr.getNextFeature())
+            ;
+          if (curr != null) {
+            prev.setNextFeature(curr.getNextFeature());
+            curr.setNextFeature(null);
+            return head;
+          }
+        }
+        throw new IllegalStateException("Feature " + feature + " not found.");
+      }
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java Mon Dec  2 17:41:44 2013
@@ -46,6 +46,21 @@ import com.google.common.base.Preconditi
  */
 public class INodeDirectory extends INodeWithAdditionalFields
     implements INodeDirectoryAttributes {
+  /** Directory related features such as quota and snapshots. */
+  public static abstract class Feature implements INode.Feature<Feature> {
+    private Feature nextFeature;
+
+    @Override
+    public Feature getNextFeature() {
+      return nextFeature;
+    }
+
+    @Override
+    public void setNextFeature(Feature next) {
+      this.nextFeature = next;
+    }
+  }
+
   /** Cast INode to INodeDirectory. */
   public static INodeDirectory valueOf(INode inode, Object path
       ) throws FileNotFoundException, PathIsNotDirectoryException {
@@ -63,6 +78,9 @@ public class INodeDirectory extends INod
   final static byte[] ROOT_NAME = DFSUtil.string2Bytes("");
 
   private List<INode> children = null;
+  
+  /** A linked list of {@link Feature}s. */
+  private Feature headFeature = null;
 
   /** constructor */
   public INodeDirectory(long id, byte[] name, PermissionStatus permissions,
@@ -76,7 +94,7 @@ public class INodeDirectory extends INod
    * @param adopt Indicate whether or not need to set the parent field of child
    *              INodes to the new node
    */
-  public INodeDirectory(INodeDirectory other, boolean adopt) {
+  public INodeDirectory(INodeDirectory other, boolean adopt, boolean copyFeatures) {
     super(other);
     this.children = other.children;
     if (adopt && this.children != null) {
@@ -84,6 +102,9 @@ public class INodeDirectory extends INod
         child.setParent(this);
       }
     }
+    if (copyFeatures) {
+      this.headFeature = other.headFeature;
+    }
   }
 
   /** @return true unconditionally. */
@@ -103,6 +124,73 @@ public class INodeDirectory extends INod
     return false;
   }
 
+  void setQuota(long nsQuota, long dsQuota) {
+    DirectoryWithQuotaFeature quota = getDirectoryWithQuotaFeature();
+    if (quota != null) {
+      // already has quota; so set the quota to the new values
+      quota.setQuota(nsQuota, dsQuota);
+      if (!isQuotaSet() && !isRoot()) {
+        removeFeature(quota);
+      }
+    } else {
+      final Quota.Counts c = computeQuotaUsage();
+      quota = addDirectoryWithQuotaFeature(nsQuota, dsQuota);
+      quota.setSpaceConsumed(c.get(Quota.NAMESPACE), c.get(Quota.DISKSPACE));
+    }
+  }
+
+  @Override
+  public Quota.Counts getQuotaCounts() {
+    final DirectoryWithQuotaFeature q = getDirectoryWithQuotaFeature();
+    return q != null? q.getQuota(): super.getQuotaCounts();
+  }
+
+  @Override
+  public void addSpaceConsumed(long nsDelta, long dsDelta, boolean verify) 
+      throws QuotaExceededException {
+    final DirectoryWithQuotaFeature q = getDirectoryWithQuotaFeature();
+    if (q != null) {
+      q.addSpaceConsumed(this, nsDelta, dsDelta, verify);
+    } else {
+      addSpaceConsumed2Parent(nsDelta, dsDelta, verify);
+    }
+  }
+
+  /**
+   * If the directory contains a {@link DirectoryWithQuotaFeature}, return it;
+   * otherwise, return null.
+   */
+  public final DirectoryWithQuotaFeature getDirectoryWithQuotaFeature() {
+    for(Feature f = headFeature; f != null; f = f.nextFeature) {
+      if (f instanceof DirectoryWithQuotaFeature) {
+        return (DirectoryWithQuotaFeature)f;
+      }
+    }
+    return null;
+  }
+
+  /** Is this directory with quota? */
+  final boolean isWithQuota() {
+    return getDirectoryWithQuotaFeature() != null;
+  }
+
+  DirectoryWithQuotaFeature addDirectoryWithQuotaFeature(
+      long nsQuota, long dsQuota) {
+    Preconditions.checkState(!isWithQuota(), "Directory is already with quota");
+    final DirectoryWithQuotaFeature quota = new DirectoryWithQuotaFeature(
+        nsQuota, dsQuota);
+    addFeature(quota);
+    return quota;
+  }
+
+  private void addFeature(Feature f) {
+    headFeature = INode.Feature.Util.addFeature(f, headFeature);
+  }
+
+  private void removeFeature(Feature f) {
+    headFeature = INode.Feature.Util.removeFeature(f, headFeature);
+  }
+
   private int searchChildren(byte[] name) {
     return children == null? -1: Collections.binarySearch(children, name);
   }
@@ -142,27 +230,6 @@ public class INodeDirectory extends INod
     return true;
   }
 
-  /**
-   * Replace itself with {@link INodeDirectoryWithQuota} or
-   * {@link INodeDirectoryWithSnapshot} depending on the latest snapshot.
-   */
-  INodeDirectoryWithQuota replaceSelf4Quota(final Snapshot latest,
-      final long nsQuota, final long dsQuota, final INodeMap inodeMap)
-      throws QuotaExceededException {
-    Preconditions.checkState(!(this instanceof INodeDirectoryWithQuota),
-        "this is already an INodeDirectoryWithQuota, this=%s", this);
-
-    if (!this.isInLatestSnapshot(latest)) {
-      final INodeDirectoryWithQuota q = new INodeDirectoryWithQuota(
-          this, true, nsQuota, dsQuota);
-      replaceSelf(q, inodeMap);
-      return q;
-    } else {
-      final INodeDirectoryWithSnapshot s = new INodeDirectoryWithSnapshot(this);
-      s.setQuota(nsQuota, dsQuota);
-      return replaceSelf(s, inodeMap).saveSelf2Snapshot(latest, this);
-    }
-  }
   /** Replace itself with an {@link INodeDirectorySnapshottable}. */
   public INodeDirectorySnapshottable replaceSelf4INodeDirectorySnapshottable(
       Snapshot latest, final INodeMap inodeMap) throws QuotaExceededException {
@@ -183,7 +250,7 @@ public class INodeDirectory extends INod
   public INodeDirectory replaceSelf4INodeDirectory(final INodeMap inodeMap) {
     Preconditions.checkState(getClass() != INodeDirectory.class,
         "the class is already INodeDirectory, this=%s", this);
-    return replaceSelf(new INodeDirectory(this, true), inodeMap);
+    return replaceSelf(new INodeDirectory(this, true, true), inodeMap);
   }
 
   /** Replace itself with the given directory. */
@@ -439,6 +506,21 @@ public class INodeDirectory extends INod
   @Override
   public Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache,
       int lastSnapshotId) {
+    final DirectoryWithQuotaFeature q = getDirectoryWithQuotaFeature();
+    if (q != null) {
+      if (useCache && isQuotaSet()) {
+        q.addNamespaceDiskspace(counts);
+      } else {
+        computeDirectoryQuotaUsage(counts, false, lastSnapshotId);
+      }
+      return counts;
+    } else {
+      return computeDirectoryQuotaUsage(counts, useCache, lastSnapshotId);
+    }
+  }
+
+  Quota.Counts computeDirectoryQuotaUsage(Quota.Counts counts, boolean useCache,
+      int lastSnapshotId) {
     if (children != null) {
       for (INode child : children) {
         child.computeQuotaUsage(counts, useCache, lastSnapshotId);
@@ -456,6 +538,16 @@ public class INodeDirectory extends INod
   @Override
   public ContentSummaryComputationContext computeContentSummary(
       ContentSummaryComputationContext summary) {
+    final DirectoryWithQuotaFeature q = getDirectoryWithQuotaFeature();
+    if (q != null) {
+      return q.computeContentSummary(this, summary);
+    } else {
+      return computeDirectoryContentSummary(summary);
+    }
+  }
+
+  ContentSummaryComputationContext computeDirectoryContentSummary(
+      ContentSummaryComputationContext summary) {
     ReadOnlyList<INode> childrenList = getChildrenList(null);
     // Explicit traversing is done to enable repositioning after relinquishing
     // and reacquiring locks.
@@ -570,7 +662,7 @@ public class INodeDirectory extends INod
       Quota.Counts counts = cleanSubtreeRecursively(snapshot, prior,
           collectedBlocks, removedINodes, null, countDiffChange);
       if (isQuotaSet()) {
-        ((INodeDirectoryWithQuota) this).addSpaceConsumed2Cache(
+        getDirectoryWithQuotaFeature().addSpaceConsumed2Cache(
             -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
       }
       return counts;
@@ -606,8 +698,9 @@ public class INodeDirectory extends INod
       final Snapshot snapshot) {
     super.dumpTreeRecursively(out, prefix, snapshot);
     out.print(", childrenSize=" + getChildrenList(snapshot).size());
-    if (this instanceof INodeDirectoryWithQuota) {
-      out.print(((INodeDirectoryWithQuota)this).quotaString());
+    final DirectoryWithQuotaFeature q = getDirectoryWithQuotaFeature();
+    if (q != null) {
+      out.print(", " + q);
     }
     if (this instanceof Snapshot.Root) {
       out.print(", snapshotId=" + snapshot.getId());

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Mon Dec  2 17:41:44 2013
@@ -29,10 +29,8 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.server.blockmanagement.*;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiff;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.Util;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiff;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileDiffList;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
@@ -47,13 +45,15 @@ public class INodeFile extends INodeWith
    * A feature contains specific information for a type of INodeFile. E.g.,
    * we can have separate features for Under-Construction and Snapshot.
    */
-  public static abstract class Feature {
+  public static abstract class Feature implements INode.Feature<Feature> {
     private Feature nextFeature;
 
+    @Override
     public Feature getNextFeature() {
       return nextFeature;
     }
 
+    @Override
     public void setNextFeature(Feature next) {
       this.nextFeature = next;
     }
@@ -157,26 +157,12 @@ public class INodeFile extends INodeWith
     return getFileUnderConstructionFeature() != null;
   }
 
-  void addFeature(Feature f) {
-    f.nextFeature = headFeature;
-    headFeature = f;
+  private void addFeature(Feature f) {
+    headFeature = INode.Feature.Util.addFeature(f, headFeature);
   }
 
-  void removeFeature(Feature f) {
-    if (f == headFeature) {
-      headFeature = headFeature.nextFeature;
-      return;
-    } else if (headFeature != null) {
-      Feature prev = headFeature;
-      Feature curr = headFeature.nextFeature;
-      for (; curr != null && curr != f; prev = curr, curr = curr.nextFeature)
-        ;
-      if (curr != null) {
-        prev.nextFeature = curr.nextFeature;
-        return;
-      }
-    }
-    throw new IllegalStateException("Feature " + f + " not found.");
+  private void removeFeature(Feature f) {
+    headFeature = INode.Feature.Util.removeFeature(f, headFeature);
   }
 
   /** @return true unconditionally. */
@@ -194,10 +180,10 @@ public class INodeFile extends INodeWith
   /* Start of Under-Construction Feature */
 
   /** Convert this file to an {@link INodeFileUnderConstruction}. */
-  public INodeFile toUnderConstruction(String clientName, String clientMachine,
+  INodeFile toUnderConstruction(String clientName, String clientMachine,
       DatanodeDescriptor clientNode) {
     Preconditions.checkState(!isUnderConstruction(),
-        "file is already an INodeFileUnderConstruction");
+        "file is already under construction");
     FileUnderConstructionFeature uc = new FileUnderConstructionFeature(
         clientName, clientMachine, clientNode);
     addFeature(uc);
@@ -209,6 +195,8 @@ public class INodeFile extends INodeWith
    * feature.
    */
   public INodeFile toCompleteFile(long mtime) {
+    Preconditions.checkState(isUnderConstruction(),
+        "file is no longer under construction");
     FileUnderConstructionFeature uc = getFileUnderConstructionFeature();
     if (uc != null) {
       assertAllBlocksComplete();
@@ -230,15 +218,16 @@ public class INodeFile extends INodeWith
     }
   }
 
-  @Override //BlockCollection
+  @Override // BlockCollection
   public void setBlock(int index, BlockInfo blk) {
     this.blocks[index] = blk;
   }
 
-  @Override // BlockCollection
+  @Override // BlockCollection, the file should be under construction
   public BlockInfoUnderConstruction setLastBlock(BlockInfo lastBlock,
       DatanodeStorageInfo[] locations) throws IOException {
-    Preconditions.checkState(isUnderConstruction());
+    Preconditions.checkState(isUnderConstruction(),
+        "file is no longer under construction");
 
     if (numBlocks() == 0) {
       throw new IOException("Failed to set last block: File is empty.");
@@ -256,6 +245,8 @@ public class INodeFile extends INodeWith
    * the last one on the list.
    */
   boolean removeLastBlock(Block oldblock) {
+    Preconditions.checkState(isUnderConstruction(),
+        "file is no longer under construction");
     if (blocks == null || blocks.length == 0) {
       return false;
     }
@@ -307,10 +298,8 @@ public class INodeFile extends INodeWith
   }
 
   @Override
-  public final short getBlockReplication() {
-    return this instanceof FileWithSnapshot?
-        Util.getBlockReplication((FileWithSnapshot)this)
-        : getFileReplication(null);
+  public short getBlockReplication() {
+    return getFileReplication(null);
   }
 
   /** Set the replication factor of this file. */
@@ -430,8 +419,8 @@ public class INodeFile extends INodeWith
     clear();
     removedINodes.add(this);
     
-    if (this instanceof FileWithSnapshot) {
-      ((FileWithSnapshot) this).getDiffs().clear();
+    if (this instanceof INodeFileWithSnapshot) {
+      ((INodeFileWithSnapshot) this).getDiffs().clear();
     }
   }
   
@@ -446,8 +435,8 @@ public class INodeFile extends INodeWith
       boolean useCache, int lastSnapshotId) {
     long nsDelta = 1;
     final long dsDelta;
-    if (this instanceof FileWithSnapshot) {
-      FileDiffList fileDiffList = ((FileWithSnapshot) this).getDiffs();
+    if (this instanceof INodeFileWithSnapshot) {
+      FileDiffList fileDiffList = ((INodeFileWithSnapshot) this).getDiffs();
       Snapshot last = fileDiffList.getLastSnapshot();
       List<FileDiff> diffs = fileDiffList.asList();
 
@@ -479,8 +468,8 @@ public class INodeFile extends INodeWith
   private void computeContentSummary4Snapshot(final Content.Counts counts) {
     // file length and diskspace only counted for the latest state of the file
     // i.e. either the current state or the last snapshot
-    if (this instanceof FileWithSnapshot) {
-      final FileWithSnapshot withSnapshot = (FileWithSnapshot)this;
+    if (this instanceof INodeFileWithSnapshot) {
+      final INodeFileWithSnapshot withSnapshot = (INodeFileWithSnapshot) this;
       final FileDiffList diffs = withSnapshot.getDiffs();
       final int n = diffs.asList().size();
       counts.add(Content.FILE, n);
@@ -496,8 +485,8 @@ public class INodeFile extends INodeWith
   }
 
   private void computeContentSummary4Current(final Content.Counts counts) {
-    if (this instanceof FileWithSnapshot
-        && ((FileWithSnapshot)this).isCurrentFileDeleted()) {
+    if (this instanceof INodeFileWithSnapshot
+        && ((INodeFileWithSnapshot) this).isCurrentFileDeleted()) {
       return;
     }
 
@@ -516,8 +505,9 @@ public class INodeFile extends INodeWith
    * otherwise, get the file size from the given snapshot.
    */
   public final long computeFileSize(Snapshot snapshot) {
-    if (snapshot != null && this instanceof FileWithSnapshot) {
-      final FileDiff d = ((FileWithSnapshot)this).getDiffs().getDiff(snapshot);
+    if (snapshot != null && this instanceof INodeFileWithSnapshot) {
+      final FileDiff d = ((INodeFileWithSnapshot) this).getDiffs().getDiff(
+          snapshot);
       if (d != null) {
         return d.getFileSize();
       }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java Mon Dec  2 17:41:44 2013
@@ -26,8 +26,8 @@ import java.util.List;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 import com.google.common.base.Preconditions;
@@ -102,8 +102,8 @@ public abstract class INodeReference ext
     }
     if (wn != null) {
       INode referred = wc.getReferredINode();
-      if (referred instanceof FileWithSnapshot) {
-        return ((FileWithSnapshot) referred).getDiffs().getPrior(
+      if (referred instanceof INodeFileWithSnapshot) {
+        return ((INodeFileWithSnapshot) referred).getDiffs().getPrior(
             wn.lastSnapshotId);
       } else if (referred instanceof INodeDirectoryWithSnapshot) { 
         return ((INodeDirectoryWithSnapshot) referred).getDiffs().getPrior(
@@ -547,8 +547,8 @@ public abstract class INodeReference ext
     private Snapshot getSelfSnapshot() {
       INode referred = getReferredINode().asReference().getReferredINode();
       Snapshot snapshot = null;
-      if (referred instanceof FileWithSnapshot) {
-        snapshot = ((FileWithSnapshot) referred).getDiffs().getPrior(
+      if (referred instanceof INodeFileWithSnapshot) {
+        snapshot = ((INodeFileWithSnapshot) referred).getDiffs().getPrior(
             lastSnapshotId);
       } else if (referred instanceof INodeDirectoryWithSnapshot) {
         snapshot = ((INodeDirectoryWithSnapshot) referred).getDiffs().getPrior(
@@ -637,10 +637,10 @@ public abstract class INodeReference ext
         Snapshot snapshot = getSelfSnapshot(prior);
         
         INode referred = getReferredINode().asReference().getReferredINode();
-        if (referred instanceof FileWithSnapshot) {
+        if (referred instanceof INodeFileWithSnapshot) {
           // if referred is a file, it must be a FileWithSnapshot since we did
           // recordModification before the rename
-          FileWithSnapshot sfile = (FileWithSnapshot) referred;
+          INodeFileWithSnapshot sfile = (INodeFileWithSnapshot) referred;
           // make sure we mark the file as deleted
           sfile.deleteCurrentFile();
           try {
@@ -671,8 +671,8 @@ public abstract class INodeReference ext
       WithCount wc = (WithCount) getReferredINode().asReference();
       INode referred = wc.getReferredINode();
       Snapshot lastSnapshot = null;
-      if (referred instanceof FileWithSnapshot) {
-        lastSnapshot = ((FileWithSnapshot) referred).getDiffs()
+      if (referred instanceof INodeFileWithSnapshot) {
+        lastSnapshot = ((INodeFileWithSnapshot) referred).getDiffs()
             .getLastSnapshot(); 
       } else if (referred instanceof INodeDirectoryWithSnapshot) {
         lastSnapshot = ((INodeDirectoryWithSnapshot) referred)

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Mon Dec  2 17:41:44 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.ha.HealthCheckF
 import org.apache.hadoop.ha.ServiceFailedException;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Trash;
+
 import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 import static org.apache.hadoop.util.ToolRunner.confirmPrompt;

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java Mon Dec  2 17:41:44 2013
@@ -21,6 +21,7 @@ import static org.apache.hadoop.hdfs.DFS
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -69,25 +70,45 @@ public class NameNodeHttpServer {
     this.bindAddress = bindAddress;
   }
   
-  public void start() throws IOException {
+  void start() throws IOException {
     final String infoHost = bindAddress.getHostName();
     int infoPort = bindAddress.getPort();
-    httpServer = new HttpServer.Builder().setName("hdfs")
-        .setBindAddress(infoHost).setPort(infoPort)
+    HttpServer.Builder builder = new HttpServer.Builder().setName("hdfs")
+        .addEndpoint(URI.create(("http://" + NetUtils.getHostPortString(bindAddress))))
         .setFindPort(infoPort == 0).setConf(conf).setACL(
             new AccessControlList(conf.get(DFS_ADMIN, " ")))
         .setSecurityEnabled(UserGroupInformation.isSecurityEnabled())
         .setUsernameConfKey(
             DFSConfigKeys.DFS_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY)
         .setKeytabConfKey(DFSUtil.getSpnegoKeytabKey(conf,
-            DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY)).build();
+            DFSConfigKeys.DFS_NAMENODE_KEYTAB_FILE_KEY));
+
+    boolean certSSL = conf.getBoolean(DFSConfigKeys.DFS_HTTPS_ENABLE_KEY, false);
+    if (certSSL) {
+      httpsAddress = NetUtils.createSocketAddr(conf.get(
+          DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,
+          DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT));
+
+      builder.addEndpoint(URI.create("https://"
+          + NetUtils.getHostPortString(httpsAddress)));
+      Configuration sslConf = new Configuration(false);
+      sslConf.setBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY, conf
+          .getBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
+              DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT));
+      sslConf.addResource(conf.get(
+          DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
+          DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
+      DFSUtil.loadSslConfToHttpServerBuilder(builder, sslConf);
+    }
+
+    httpServer = builder.build();
     if (WebHdfsFileSystem.isEnabled(conf, HttpServer.LOG)) {
       //add SPNEGO authentication filter for webhdfs
       final String name = "SPNEGO";
       final String classname = AuthFilter.class.getName();
       final String pathSpec = WebHdfsFileSystem.PATH_PREFIX + "/*";
       Map<String, String> params = getAuthFilterParams(conf);
-      httpServer.defineFilter(httpServer.getWebAppContext(), name, classname, params,
+      HttpServer.defineFilter(httpServer.getWebAppContext(), name, classname, params,
           new String[]{pathSpec});
       HttpServer.LOG.info("Added filter '" + name + "' (class=" + classname + ")");
 
@@ -97,34 +118,19 @@ public class NameNodeHttpServer {
           + ";" + Param.class.getPackage().getName(), pathSpec);
       }
 
-    boolean certSSL = conf.getBoolean(DFSConfigKeys.DFS_HTTPS_ENABLE_KEY, false);
+    httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
+    httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
+    setupServlets(httpServer, conf);
+    httpServer.start();
+    httpAddress = httpServer.getConnectorAddress(0);
     if (certSSL) {
-      boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
-      httpsAddress = NetUtils.createSocketAddr(conf.get(
-          DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,
-          DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT));
-
-      Configuration sslConf = new Configuration(false);
-      sslConf.addResource(conf.get(
-          DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
-          DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
-      httpServer.addSslListener(httpsAddress, sslConf, needClientAuth);
+      httpsAddress = httpServer.getConnectorAddress(1);
       // assume same ssl port for all datanodes
       InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf.get(
         DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 50475));
       httpServer.setAttribute(DFSConfigKeys.DFS_DATANODE_HTTPS_PORT_KEY, datanodeSslPort
         .getPort());
     }
-    httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
-    httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
-    setupServlets(httpServer, conf);
-    httpServer.start();
-    httpAddress = new InetSocketAddress(bindAddress.getAddress(),
-        httpServer.getPort());
-    if (certSSL) {
-      httpsAddress = new InetSocketAddress(bindAddress.getAddress(),
-          httpServer.getConnectorPort(1));
-    }
   }
   
   private Map<String, String> getAuthFilterParams(Configuration conf)

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeMXBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeMXBean.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeMXBean.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeMXBean.java Mon Dec  2 17:41:44 2013
@@ -101,6 +101,16 @@ public interface NameNodeMXBean {
    * @return the percentage of the remaining space on the cluster
    */
   public float getPercentRemaining();
+
+  /**
+   * Returns the amount of cache used by the datanode (in bytes).
+   */
+  public long getCacheUsed();
+
+  /**
+   * Returns the total cache capacity of the datanode (in bytes).
+   */
+  public long getCacheCapacity();
   
   /**
    * Get the total space used by the block pools of this namenode

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Mon Dec  2 17:41:44 2013
@@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -1301,26 +1302,26 @@ class NameNodeRpcServer implements Namen
   }
 
   private class ServerSideCachePoolIterator 
-      extends BatchedRemoteIterator<String, CachePoolInfo> {
+      extends BatchedRemoteIterator<String, CachePoolEntry> {
 
     public ServerSideCachePoolIterator(String prevKey) {
       super(prevKey);
     }
 
     @Override
-    public BatchedEntries<CachePoolInfo> makeRequest(String prevKey)
+    public BatchedEntries<CachePoolEntry> makeRequest(String prevKey)
         throws IOException {
       return namesystem.listCachePools(prevKey);
     }
 
     @Override
-    public String elementToPrevKey(CachePoolInfo element) {
-      return element.getPoolName();
+    public String elementToPrevKey(CachePoolEntry entry) {
+      return entry.getInfo().getPoolName();
     }
   }
 
   @Override
-  public RemoteIterator<CachePoolInfo> listCachePools(String prevKey)
+  public RemoteIterator<CachePoolEntry> listCachePools(String prevKey)
       throws IOException {
     return new ServerSideCachePoolIterator(prevKey);
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Mon Dec  2 17:41:44 2013
@@ -36,6 +36,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.BlockReaderFactory;
@@ -46,9 +47,11 @@ import org.apache.hadoop.hdfs.net.TcpPee
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -139,6 +142,9 @@ public class NamenodeFsck {
 
   private final Configuration conf;
   private final PrintWriter out;
+  private List<String> snapshottableDirs = null;
+
+  private BlockPlacementPolicy bpPolicy;
 
   /**
    * Filesystem checker.
@@ -162,6 +168,8 @@ public class NamenodeFsck {
     this.totalDatanodes = totalDatanodes;
     this.minReplication = minReplication;
     this.remoteAddress = remoteAddress;
+    this.bpPolicy = BlockPlacementPolicy.getInstance(conf, null,
+        networktopology);
 
     for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
       String key = it.next();
@@ -178,6 +186,8 @@ public class NamenodeFsck {
       }
       else if (key.equals("startblockafter")) {
         this.currentCookie[0] = pmap.get("startblockafter")[0];
+      } else if (key.equals("includeSnapshots")) {
+        this.snapshottableDirs = new ArrayList<String>();
       }
     }
   }
@@ -194,6 +204,16 @@ public class NamenodeFsck {
       out.println(msg);
       namenode.getNamesystem().logFsckEvent(path, remoteAddress);
 
+      if (snapshottableDirs != null) {
+        SnapshottableDirectoryStatus[] snapshotDirs = namenode.getRpcServer()
+            .getSnapshottableDirListing();
+        if (snapshotDirs != null) {
+          for (SnapshottableDirectoryStatus dir : snapshotDirs) {
+            snapshottableDirs.add(dir.getFullPath().toString());
+          }
+        }
+      }
+
       final HdfsFileStatus file = namenode.getRpcServer().getFileInfo(path);
       if (file != null) {
 
@@ -272,6 +292,14 @@ public class NamenodeFsck {
     boolean isOpen = false;
 
     if (file.isDir()) {
+      if (snapshottableDirs != null && snapshottableDirs.contains(path)) {
+        String snapshotPath = (path.endsWith(Path.SEPARATOR) ? path : path
+            + Path.SEPARATOR)
+            + HdfsConstants.DOT_SNAPSHOT_DIR;
+        HdfsFileStatus snapshotFileInfo = namenode.getRpcServer().getFileInfo(
+            snapshotPath);
+        check(snapshotPath, snapshotFileInfo, res);
+      }
       byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;
       DirectoryListing thisListing;
       if (showFiles) {
@@ -375,9 +403,8 @@ public class NamenodeFsck {
                     locs.length + " replica(s).");
       }
       // verify block placement policy
-      BlockPlacementStatus blockPlacementStatus = 
-          BlockPlacementPolicy.getInstance(conf, null, networktopology).
-              verifyBlockPlacement(path, lBlk, targetFileReplication);
+      BlockPlacementStatus blockPlacementStatus = bpPolicy
+          .verifyBlockPlacement(path, lBlk, targetFileReplication);
       if (!blockPlacementStatus.isPlacementPolicySatisfied()) {
         res.numMisReplicatedBlocks++;
         misReplicatedPerFile++;

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Mon Dec  2 17:41:44 2013
@@ -335,7 +335,7 @@ class NamenodeJspHelper {
         } else if (openForWrite) {
           EditLogOutputStream elos = jas.getCurrentStream();
           if (elos != null) {
-            out.println(elos.generateHtmlReport());
+            out.println(elos.generateReport());
           } else {
             out.println("not currently writing");
           }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Mon Dec  2 17:41:44 2013
@@ -30,6 +30,7 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
@@ -214,7 +215,7 @@ public class SecondaryNameNode implement
   
   /**
    * Initialize SecondaryNameNode.
-   * @param commandLineOpts 
+   * @param commandLineOpts
    */
   private void initialize(final Configuration conf,
       CommandLineOpts commandLineOpts) throws IOException {
@@ -256,8 +257,15 @@ public class SecondaryNameNode implement
 
     // initialize the webserver for uploading files.
     int tmpInfoPort = infoSocAddr.getPort();
+    URI httpEndpoint;
+    try {
+      httpEndpoint = new URI("http://" + NetUtils.getHostPortString(infoSocAddr));
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+
     infoServer = new HttpServer.Builder().setName("secondary")
-        .setBindAddress(infoBindAddress).setPort(tmpInfoPort)
+        .addEndpoint(httpEndpoint)
         .setFindPort(tmpInfoPort == 0).setConf(conf).setACL(
             new AccessControlList(conf.get(DFS_ADMIN, " ")))
         .setSecurityEnabled(UserGroupInformation.isSecurityEnabled())
@@ -275,7 +283,7 @@ public class SecondaryNameNode implement
     LOG.info("Web server init done");
 
     // The web-server port can be ephemeral... ensure we have the correct info
-    infoPort = infoServer.getPort();
+    infoPort = infoServer.getConnectorAddress(0).getPort();
 
     conf.set(DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, infoBindAddress + ":" + infoPort);
     LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" + infoPort);

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java Mon Dec  2 17:41:44 2013
@@ -35,7 +35,8 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.http.HttpConfig;
-import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
@@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.server.com
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.io.MD5Hash;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -62,6 +64,15 @@ public class TransferFsImage {
   public final static String MD5_HEADER = "X-MD5-Digest";
   @VisibleForTesting
   static int timeout = 0;
+  private static URLConnectionFactory connectionFactory;
+  private static boolean isSpnegoEnabled;
+
+  static {
+    Configuration conf = new Configuration();
+    connectionFactory = URLConnectionFactory
+        .newDefaultURLConnectionFactory(conf);
+    isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
+  }
 
   private static final Log LOG = LogFactory.getLog(TransferFsImage.class);
   
@@ -250,8 +261,13 @@ public class TransferFsImage {
   public static MD5Hash doGetUrl(URL url, List<File> localPaths,
       Storage dstStorage, boolean getChecksum) throws IOException {
     long startTime = Time.monotonicNow();
-    HttpURLConnection connection = (HttpURLConnection)
-      SecurityUtil.openSecureHttpConnection(url);
+    HttpURLConnection connection;
+    try {
+      connection = (HttpURLConnection)
+        connectionFactory.openConnection(url, isSpnegoEnabled);
+    } catch (AuthenticationException e) {
+      throw new IOException(e);
+    }
 
     if (timeout <= 0) {
       Configuration conf = new HdfsConfiguration();

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java Mon Dec  2 17:41:44 2013
@@ -432,8 +432,8 @@ public class INodeDirectorySnapshottable
           parentPath.remove(parentPath.size() - 1);
         }
       }
-    } else if (node.isFile() && node.asFile() instanceof FileWithSnapshot) {
-      FileWithSnapshot file = (FileWithSnapshot) node.asFile();
+    } else if (node.isFile() && node.asFile() instanceof INodeFileWithSnapshot) {
+      INodeFileWithSnapshot file = (INodeFileWithSnapshot) node.asFile();
       Snapshot earlierSnapshot = diffReport.isFromEarlier() ? diffReport.from
           : diffReport.to;
       Snapshot laterSnapshot = diffReport.isFromEarlier() ? diffReport.to
@@ -441,7 +441,7 @@ public class INodeDirectorySnapshottable
       boolean change = file.getDiffs().changedBetweenSnapshots(earlierSnapshot,
           laterSnapshot);
       if (change) {
-        diffReport.addFileDiff(file.asINodeFile(), relativePath);
+        diffReport.addFileDiff(file, relativePath);
       }
     }
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java Mon Dec  2 17:41:44 2013
@@ -37,7 +37,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryAttributes;
-import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
 import org.apache.hadoop.hdfs.server.namenode.INodeMap;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference;
 import org.apache.hadoop.hdfs.server.namenode.Quota;
@@ -55,7 +54,7 @@ import com.google.common.base.Preconditi
  * storing snapshot data. When there are modifications to the directory, the old
  * data is stored in the latest snapshot, if there is any.
  */
-public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
+public class INodeDirectoryWithSnapshot extends INodeDirectory {
   /**
    * The difference between the current state and a previous snapshot
    * of the children list of an INodeDirectory.
@@ -185,14 +184,10 @@ public class INodeDirectoryWithSnapshot 
         INode dnode = deleted.get(d);
         if (cnode.compareTo(dnode.getLocalNameBytes()) == 0) {
           fullPath[fullPath.length - 1] = cnode.getLocalNameBytes();
-          if (cnode.isSymlink() && dnode.isSymlink()) {
-            dList.add(new DiffReportEntry(DiffType.MODIFY, fullPath));
-          } else {
-            // must be the case: delete first and then create an inode with the
-            // same name
-            cList.add(new DiffReportEntry(DiffType.CREATE, fullPath));
-            dList.add(new DiffReportEntry(DiffType.DELETE, fullPath));
-          }
+          // must be the case: delete first and then create an inode with the
+          // same name
+          cList.add(new DiffReportEntry(DiffType.CREATE, fullPath));
+          dList.add(new DiffReportEntry(DiffType.DELETE, fullPath));
           c++;
           d++;
         } else if (cnode.compareTo(dnode.getLocalNameBytes()) < 0) {
@@ -490,7 +485,7 @@ public class INodeDirectoryWithSnapshot 
 
   INodeDirectoryWithSnapshot(INodeDirectory that, boolean adopt,
       DirectoryDiffList diffs) {
-    super(that, adopt, that.getQuotaCounts());
+    super(that, adopt, true);
     this.diffs = diffs != null? diffs: new DirectoryDiffList();
   }
 
@@ -775,8 +770,8 @@ public class INodeDirectoryWithSnapshot 
         removedINodes, priorDeleted, countDiffChange));
     
     if (isQuotaSet()) {
-      this.addSpaceConsumed2Cache(-counts.get(Quota.NAMESPACE),
-          -counts.get(Quota.DISKSPACE));
+      getDirectoryWithQuotaFeature().addSpaceConsumed2Cache(
+          -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
     }
     return counts;
   }
@@ -809,10 +804,10 @@ public class INodeDirectoryWithSnapshot 
         // For DstReference node, since the node is not in the created list of
         // prior, we should treat it as regular file/dir
       } else if (topNode.isFile()
-          && topNode.asFile() instanceof FileWithSnapshot) {
-        FileWithSnapshot fs = (FileWithSnapshot) topNode.asFile();
-        counts.add(fs.getDiffs().deleteSnapshotDiff(post, prior,
-            topNode.asFile(), collectedBlocks, removedINodes, countDiffChange));
+          && topNode.asFile() instanceof INodeFileWithSnapshot) {
+        INodeFileWithSnapshot fs = (INodeFileWithSnapshot) topNode.asFile();
+        counts.add(fs.getDiffs().deleteSnapshotDiff(post, prior, fs,
+            collectedBlocks, removedINodes, countDiffChange));
       } else if (topNode.isDirectory()) {
         INodeDirectory dir = topNode.asDirectory();
         ChildrenDiff priorChildrenDiff = null;

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java Mon Dec  2 17:41:44 2013
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
@@ -31,14 +32,13 @@ import org.apache.hadoop.hdfs.server.nam
  * Represent an {@link INodeFile} that is snapshotted.
  */
 @InterfaceAudience.Private
-public class INodeFileWithSnapshot extends INodeFile
-    implements FileWithSnapshot {
+public class INodeFileWithSnapshot extends INodeFile {
   private final FileDiffList diffs;
   private boolean isCurrentFileDeleted = false;
 
   public INodeFileWithSnapshot(INodeFile f) {
-    this(f, f instanceof FileWithSnapshot?
-        ((FileWithSnapshot)f).getDiffs(): null);
+    this(f, f instanceof INodeFileWithSnapshot ? 
+        ((INodeFileWithSnapshot) f).getDiffs() : null);
   }
 
   public INodeFileWithSnapshot(INodeFile f, FileDiffList diffs) {
@@ -46,12 +46,12 @@ public class INodeFileWithSnapshot exten
     this.diffs = diffs != null? diffs: new FileDiffList();
   }
 
-  @Override
+  /** Is the current file deleted? */
   public boolean isCurrentFileDeleted() {
     return isCurrentFileDeleted;
   }
   
-  @Override
+  /** Delete the file from the current tree */
   public void deleteCurrentFile() {
     isCurrentFileDeleted = true;
   }
@@ -70,12 +70,7 @@ public class INodeFileWithSnapshot exten
     return this;
   }
 
-  @Override
-  public INodeFile asINodeFile() {
-    return this;
-  }
-
-  @Override
+  /** @return the file diff list. */
   public FileDiffList getDiffs() {
     return diffs;
   }
@@ -90,7 +85,7 @@ public class INodeFileWithSnapshot exten
         recordModification(prior, null);
         deleteCurrentFile();
       }
-      Util.collectBlocksAndClear(this, collectedBlocks, removedINodes);
+      this.collectBlocksAndClear(collectedBlocks, removedINodes);
       return Quota.Counts.newInstance();
     } else { // delete a snapshot
       prior = getDiffs().updatePrior(snapshot, prior);
@@ -104,4 +99,100 @@ public class INodeFileWithSnapshot exten
     return super.toDetailString()
         + (isCurrentFileDeleted()? "(DELETED), ": ", ") + diffs;
   }
+  
+  /** 
+   * @return block replication, which is the max file replication among
+   *         the file and the diff list.
+   */
+  @Override
+  public short getBlockReplication() {
+    short max = isCurrentFileDeleted() ? 0 : getFileReplication();
+    for(FileDiff d : getDiffs()) {
+      if (d.snapshotINode != null) {
+        final short replication = d.snapshotINode.getFileReplication();
+        if (replication > max) {
+          max = replication;
+        }
+      }
+    }
+    return max;
+  }
+  
+  /**
+   * If some blocks at the end of the block list no longer belongs to
+   * any inode, collect them and update the block list.
+   */
+  void collectBlocksAndClear(final BlocksMapUpdateInfo info,
+      final List<INode> removedINodes) {
+    // check if everything is deleted.
+    if (isCurrentFileDeleted() && getDiffs().asList().isEmpty()) {
+      destroyAndCollectBlocks(info, removedINodes);
+      return;
+    }
+
+    // find max file size.
+    final long max;
+    if (isCurrentFileDeleted()) {
+      final FileDiff last = getDiffs().getLast();
+      max = last == null? 0: last.getFileSize();
+    } else { 
+      max = computeFileSize();
+    }
+
+    collectBlocksBeyondMax(max, info);
+  }
+
+  private void collectBlocksBeyondMax(final long max,
+      final BlocksMapUpdateInfo collectedBlocks) {
+    final BlockInfo[] oldBlocks = getBlocks();
+    if (oldBlocks != null) {
+      //find the minimum n such that the size of the first n blocks > max
+      int n = 0;
+      for(long size = 0; n < oldBlocks.length && max > size; n++) {
+        size += oldBlocks[n].getNumBytes();
+      }
+      
+      // starting from block n, the data is beyond max.
+      if (n < oldBlocks.length) {
+        // resize the array.  
+        final BlockInfo[] newBlocks;
+        if (n == 0) {
+          newBlocks = null;
+        } else {
+          newBlocks = new BlockInfo[n];
+          System.arraycopy(oldBlocks, 0, newBlocks, 0, n);
+        }
+        
+        // set new blocks
+        setBlocks(newBlocks);
+
+        // collect the blocks beyond max.  
+        if (collectedBlocks != null) {
+          for(; n < oldBlocks.length; n++) {
+            collectedBlocks.addDeleteBlock(oldBlocks[n]);
+          }
+        }
+      }
+    }
+  }
+  
+  Quota.Counts updateQuotaAndCollectBlocks(FileDiff removed,
+      BlocksMapUpdateInfo collectedBlocks, final List<INode> removedINodes) {
+    long oldDiskspace = this.diskspaceConsumed();
+    if (removed.snapshotINode != null) {
+      short replication = removed.snapshotINode.getFileReplication();
+      short currentRepl = getBlockReplication();
+      if (currentRepl == 0) {
+        oldDiskspace = computeFileSize(true, true) * replication;
+      } else if (replication > currentRepl) {  
+        oldDiskspace = oldDiskspace / getBlockReplication()
+            * replication;
+      }
+    }
+    
+    this.collectBlocksAndClear(collectedBlocks, removedINodes);
+    
+    long dsDelta = oldDiskspace - diskspaceConsumed();
+    return Quota.Counts.newInstance(0, dsDelta);
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/Snapshot.java Mon Dec  2 17:41:44 2013
@@ -136,7 +136,7 @@ public class Snapshot implements Compara
   /** The root directory of the snapshot. */
   static public class Root extends INodeDirectory {
     Root(INodeDirectory other) {
-      super(other, false);
+      super(other, false, false);
     }
 
     @Override

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotFSImageFormat.java Mon Dec  2 17:41:44 2013
@@ -36,8 +36,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.INodeFileAttributes;
 import org.apache.hadoop.hdfs.server.namenode.INodeReference;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiff;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot.DirectoryDiff;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot.DirectoryDiffList;
 import org.apache.hadoop.hdfs.tools.snapshot.SnapshotDiff;
@@ -99,8 +97,8 @@ public class SnapshotFSImageFormat {
   
   public static void saveFileDiffList(final INodeFile file,
       final DataOutput out) throws IOException {
-    saveINodeDiffs(file instanceof FileWithSnapshot?
-        ((FileWithSnapshot)file).getDiffs(): null, out, null);
+    saveINodeDiffs(file instanceof INodeFileWithSnapshot?
+        ((INodeFileWithSnapshot) file).getDiffs(): null, out, null);
   }
 
   public static FileDiffList loadFileDiffList(DataInput in,

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java Mon Dec  2 17:41:44 2013
@@ -29,11 +29,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.server.namenode.CachePool;
 import org.apache.hadoop.hdfs.tools.TableListing.Justification;
 import org.apache.hadoop.ipc.RemoteException;
@@ -131,7 +133,8 @@ public class CacheAdmin extends Configur
     @Override
     public String getShortUsage() {
       return "[" + getName() +
-          " -path <path> -replication <replication> -pool <pool-name>]\n";
+          " -path <path> -pool <pool-name> " +
+          "[-replication <replication>] [-ttl <time-to-live>]]\n";
     }
 
     @Override
@@ -139,11 +142,15 @@ public class CacheAdmin extends Configur
       TableListing listing = getOptionDescriptionListing();
       listing.addRow("<path>", "A path to cache. The path can be " +
           "a directory or a file.");
-      listing.addRow("<replication>", "The cache replication factor to use. " +
-          "Defaults to 1.");
       listing.addRow("<pool-name>", "The pool to which the directive will be " +
           "added. You must have write permission on the cache pool "
           + "in order to add new directives.");
+      listing.addRow("<replication>", "The cache replication factor to use. " +
+          "Defaults to 1.");
+      listing.addRow("<time-to-live>", "How long the directive is " +
+          "valid. Can be specified in minutes, hours, and days via e.g. " +
+          "30m, 4h, 2d. Valid units are [smhd]." +
+          " If unspecified, the directive never expires.");
       return getShortUsage() + "\n" +
         "Add a new cache directive.\n\n" +
         listing.toString();
@@ -151,33 +158,48 @@ public class CacheAdmin extends Configur
 
     @Override
     public int run(Configuration conf, List<String> args) throws IOException {
+      CacheDirectiveInfo.Builder builder = new CacheDirectiveInfo.Builder();
+
       String path = StringUtils.popOptionWithArgument("-path", args);
       if (path == null) {
         System.err.println("You must specify a path with -path.");
         return 1;
       }
-      short replication = 1;
-      String replicationString =
-          StringUtils.popOptionWithArgument("-replication", args);
-      if (replicationString != null) {
-        replication = Short.parseShort(replicationString);
-      }
+      builder.setPath(new Path(path));
+
       String poolName = StringUtils.popOptionWithArgument("-pool", args);
       if (poolName == null) {
         System.err.println("You must specify a pool name with -pool.");
         return 1;
       }
+      builder.setPool(poolName);
+
+      String replicationString =
+          StringUtils.popOptionWithArgument("-replication", args);
+      if (replicationString != null) {
+        Short replication = Short.parseShort(replicationString);
+        builder.setReplication(replication);
+      }
+
+      String ttlString = StringUtils.popOptionWithArgument("-ttl", args);
+      if (ttlString != null) {
+        try {
+          long ttl = DFSUtil.parseRelativeTime(ttlString);
+          builder.setExpiration(CacheDirectiveInfo.Expiration.newRelative(ttl));
+        } catch (IOException e) {
+          System.err.println(
+              "Error while parsing ttl value: " + e.getMessage());
+          return 1;
+        }
+      }
+
       if (!args.isEmpty()) {
         System.err.println("Can't understand argument: " + args.get(0));
         return 1;
       }
         
       DistributedFileSystem dfs = getDFS(conf);
-      CacheDirectiveInfo directive = new CacheDirectiveInfo.Builder().
-          setPath(new Path(path)).
-          setReplication(replication).
-          setPool(poolName).
-          build();
+      CacheDirectiveInfo directive = builder.build();
       try {
         long id = dfs.addCacheDirective(directive);
         System.out.println("Added cache directive " + id);
@@ -260,7 +282,7 @@ public class CacheAdmin extends Configur
     public String getShortUsage() {
       return "[" + getName() +
           " -id <id> [-path <path>] [-replication <replication>] " +
-          "[-pool <pool-name>] ]\n";
+          "[-pool <pool-name>] [-ttl <time-to-live>]]\n";
     }
 
     @Override
@@ -274,6 +296,10 @@ public class CacheAdmin extends Configur
       listing.addRow("<pool-name>", "The pool to which the directive will be " +
           "added. You must have write permission on the cache pool "
           + "in order to move a directive into it. (optional)");
+      listing.addRow("<time-to-live>", "How long the directive is " +
+          "valid. Can be specified in minutes, hours, and days via e.g. " +
+          "30m, 4h, 2d. Valid units are [smhd]." +
+          " If unspecified, the directive never expires.");
       return getShortUsage() + "\n" +
         "Modify a cache directive.\n\n" +
         listing.toString();
@@ -307,6 +333,19 @@ public class CacheAdmin extends Configur
         builder.setPool(poolName);
         modified = true;
       }
+      String ttlString = StringUtils.popOptionWithArgument("-ttl", args);
+      if (ttlString != null) {
+        long ttl;
+        try {
+          ttl = DFSUtil.parseRelativeTime(ttlString);
+        } catch (IOException e) {
+          System.err.println(
+              "Error while parsing ttl value: " + e.getMessage());
+          return 1;
+        }
+        builder.setExpiration(CacheDirectiveInfo.Expiration.newRelative(ttl));
+        modified = true;
+      }
       if (!args.isEmpty()) {
         System.err.println("Can't understand argument: " + args.get(0));
         System.err.println("Usage is " + getShortUsage());
@@ -434,7 +473,8 @@ public class CacheAdmin extends Configur
       TableListing.Builder tableBuilder = new TableListing.Builder().
           addField("ID", Justification.RIGHT).
           addField("POOL", Justification.LEFT).
-          addField("REPLICATION", Justification.RIGHT).
+          addField("REPL", Justification.RIGHT).
+          addField("EXPIRY", Justification.LEFT).
           addField("PATH", Justification.LEFT);
       if (printStats) {
         tableBuilder.addField("NEEDED", Justification.RIGHT).
@@ -455,6 +495,14 @@ public class CacheAdmin extends Configur
         row.add("" + directive.getId());
         row.add(directive.getPool());
         row.add("" + directive.getReplication());
+        String expiry;
+        if (directive.getExpiration().getMillis() ==
+            CacheDirectiveInfo.Expiration.EXPIRY_NEVER) {
+          expiry = "never";
+        } else {
+          expiry = directive.getExpiration().toString();
+        }
+        row.add(expiry);
         row.add(directive.getPath().toUri().getPath());
         if (printStats) {
           row.add("" + stats.getBytesNeeded());
@@ -755,9 +803,10 @@ public class CacheAdmin extends Configur
           build();
       int numResults = 0;
       try {
-        RemoteIterator<CachePoolInfo> iter = dfs.listCachePools();
+        RemoteIterator<CachePoolEntry> iter = dfs.listCachePools();
         while (iter.hasNext()) {
-          CachePoolInfo info = iter.next();
+          CachePoolEntry entry = iter.next();
+          CachePoolInfo info = entry.getInfo();
           String[] row = new String[5];
           if (name == null || info.getPoolName().equals(name)) {
             row[0] = info.getPoolName();
@@ -822,14 +871,15 @@ public class CacheAdmin extends Configur
         return 0;
       }
       String commandName = args.get(0);
-      Command command = determineCommand(commandName);
+      // prepend a dash to match against the command names
+      Command command = determineCommand("-"+commandName);
       if (command == null) {
         System.err.print("Sorry, I don't know the command '" +
           commandName + "'.\n");
-        System.err.print("Valid command names are:\n");
+        System.err.print("Valid help command names are:\n");
         String separator = "";
         for (Command c : COMMANDS) {
-          System.err.print(separator + c.getName());
+          System.err.print(separator + c.getName().substring(1));
           separator = ", ";
         }
         System.err.print("\n");

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java Mon Dec  2 17:41:44 2013
@@ -36,9 +36,10 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.http.HttpConfig;
-import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -82,18 +83,28 @@ public class DFSck extends Configured im
       + "\t-delete\tdelete corrupted files\n"
       + "\t-files\tprint out files being checked\n"
       + "\t-openforwrite\tprint out files opened for write\n"
+      + "\t-includeSnapshots\tinclude snapshot data if the given path"
+      + " indicates a snapshottable directory or there are "
+      + "snapshottable directories under it\n"
       + "\t-list-corruptfileblocks\tprint out list of missing "
       + "blocks and files they belong to\n"
       + "\t-blocks\tprint out block report\n"
       + "\t-locations\tprint out locations for every block\n"
-      + "\t-racks\tprint out network topology for data-node locations\n"
-      + "\t\tBy default fsck ignores files opened for write, "
+      + "\t-racks\tprint out network topology for data-node locations\n\n"
+      + "Please Note:\n"
+      + "\t1. By default fsck ignores files opened for write, "
       + "use -openforwrite to report such files. They are usually "
       + " tagged CORRUPT or HEALTHY depending on their block "
-      + "allocation status";
+      + "allocation status\n"
+      + "\t2. Option -includeSnapshots should not be used for comparing stats,"
+      + " should be used only for HEALTH check, as this may contain duplicates"
+      + " if the same file present in both original fs tree "
+      + "and inside snapshots.";
   
   private final UserGroupInformation ugi;
   private final PrintStream out;
+  private final URLConnectionFactory connectionFactory;
+  private final boolean isSpnegoEnabled;
 
   /**
    * Filesystem checker.
@@ -107,6 +118,9 @@ public class DFSck extends Configured im
     super(conf);
     this.ugi = UserGroupInformation.getCurrentUser();
     this.out = out;
+    this.connectionFactory = URLConnectionFactory
+        .newDefaultURLConnectionFactory(conf);
+    this.isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
   }
 
   /**
@@ -158,7 +172,12 @@ public class DFSck extends Configured im
         url.append("&startblockafter=").append(String.valueOf(cookie));
       }
       URL path = new URL(url.toString());
-      URLConnection connection = SecurityUtil.openSecureHttpConnection(path);
+      URLConnection connection;
+      try {
+        connection = connectionFactory.openConnection(path, isSpnegoEnabled);
+      } catch (AuthenticationException e) {
+        throw new IOException(e);
+      }
       InputStream stream = connection.getInputStream();
       BufferedReader input = new BufferedReader(new InputStreamReader(
           stream, "UTF-8"));
@@ -255,6 +274,8 @@ public class DFSck extends Configured im
       else if (args[idx].equals("-list-corruptfileblocks")) {
         url.append("&listcorruptfileblocks=1");
         doListCorruptFileBlocks = true;
+      } else if (args[idx].equals("-includeSnapshots")) {
+        url.append("&includeSnapshots=1");
       } else if (!args[idx].startsWith("-")) {
         if (null == dir) {
           dir = args[idx];
@@ -278,7 +299,12 @@ public class DFSck extends Configured im
       return listCorruptFileBlocks(dir, url.toString());
     }
     URL path = new URL(url.toString());
-    URLConnection connection = SecurityUtil.openSecureHttpConnection(path);
+    URLConnection connection;
+    try {
+      connection = connectionFactory.openConnection(path, isSpnegoEnabled);
+    } catch (AuthenticationException e) {
+      throw new IOException(e);
+    }
     InputStream stream = connection.getInputStream();
     BufferedReader input = new BufferedReader(new InputStreamReader(
                                               stream, "UTF-8"));

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java Mon Dec  2 17:41:44 2013
@@ -145,7 +145,7 @@ public class DelegationTokenFetcher {
     // default to using the local file system
     FileSystem local = FileSystem.getLocal(conf);
     final Path tokenFile = new Path(local.getWorkingDirectory(), remaining[0]);
-    final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
+    final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
 
     // Login the current user
     UserGroupInformation.getCurrentUser().doAs(

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java Mon Dec  2 17:41:44 2013
@@ -176,10 +176,9 @@ public class HftpFileSystem extends File
    * Initialize connectionFactory and tokenAspect. This function is intended to
    * be overridden by HsFtpFileSystem.
    */
-  protected void initConnectionFactoryAndTokenAspect(Configuration conf)
+  protected void initTokenAspect(Configuration conf)
       throws IOException {
     tokenAspect = new TokenAspect<HftpFileSystem>(this, TOKEN_KIND);
-    connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
   }
 
   @Override
@@ -187,6 +186,8 @@ public class HftpFileSystem extends File
   throws IOException {
     super.initialize(name, conf);
     setConf(conf);
+    this.connectionFactory = URLConnectionFactory
+        .newDefaultURLConnectionFactory(conf);
     this.ugi = UserGroupInformation.getCurrentUser();
     this.nnUri = getNamenodeUri(name);
 
@@ -197,7 +198,7 @@ public class HftpFileSystem extends File
       throw new IllegalArgumentException(e);
     }
 
-    initConnectionFactoryAndTokenAspect(conf);
+    initTokenAspect(conf);
     if (UserGroupInformation.isSecurityEnabled()) {
       tokenAspect.initDelegationToken(ugi);
     }
@@ -338,7 +339,7 @@ public class HftpFileSystem extends File
   }
 
   static class RangeHeaderUrlOpener extends ByteRangeInputStream.URLOpener {
-    URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
+    URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_SYSTEM_CONNECTION_FACTORY;
 
     RangeHeaderUrlOpener(final URL url) {
       super(url);



Mime
View raw message